Skip to content

Commit

Permalink
Sync with latest P4
Browse files Browse the repository at this point in the history
  • Loading branch information
thegridman committed Dec 2, 2024
1 parent 7ac0ad8 commit 61c79db
Show file tree
Hide file tree
Showing 9 changed files with 287 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2221,7 +2221,7 @@ else if (aSubscriberId.length == 1 && SubscriberId.NullSubscriber.equals(aSubscr
{
subscription.updateChannelAllocations(getChannelAllocationStrategy());
PagedTopicConfigMap.updateSubscription(configMap, subscription);
Logger.finest("Removed subscribers from " + Arrays.toString(aSubscriberId)
Logger.finest("Removed subscribers " + Arrays.toString(aSubscriberId)
+ " from subscription " + subscription.getKey());
}
}
Expand Down Expand Up @@ -2688,7 +2688,7 @@ public void __init()
// Private initializer
protected void __initPrivate()
{

super.__initPrivate();
}

Expand Down Expand Up @@ -3114,7 +3114,7 @@ public static class MemberWelcomeRequest
{
// ---- Fields declarations ----
private static com.tangosol.util.ListMap __mapChildren;

// Static initializer
static
{
Expand Down Expand Up @@ -5363,7 +5363,7 @@ public void entryDeleted(com.tangosol.util.MapEvent evt)
}
}
}

super.entryDeleted(evt);
service.onTopicLifecycle(dispatcher, TopicLifecycleEvent.Type.DESTROYED);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.tangosol.internal.net.topic.SubscriberConnector;
import com.tangosol.internal.net.topic.TopicSubscription;
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberGroupId;
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberId;
import com.tangosol.net.topic.Position;
import com.tangosol.net.topic.Subscriber;
import com.tangosol.net.topic.TopicDependencies;
Expand Down Expand Up @@ -90,12 +91,6 @@ public Position[] initialize(ConnectedSubscriber<V> subscriber, boolean fForceRe
return ensureRunningConnector().initialize(subscriber, fForceReconnect, fReconnect, fDisconnected);
}

@Override
public void initializeSubscription(ConnectedSubscriber<V> subscriber, boolean fForceReconnect)
{
ensureRunningConnector().initializeSubscription(subscriber, fForceReconnect);
}

@Override
public long getConnectionTimestamp()
{
Expand Down Expand Up @@ -123,9 +118,15 @@ public long getSubscriptionId()
}

@Override
public Position[] initializeSubscriptionHeads(ConnectedSubscriber<V> subscriber, boolean fReconnect, boolean fDisconnected)
public SubscriberId getSubscriberId()
{
return ensureRunningConnector().getSubscriberId();
}

@Override
public SubscriberGroupId getSubscriberGroupId()
{
return ensureRunningConnector().initializeSubscriptionHeads(subscriber, fReconnect, fDisconnected);
return ensureRunningConnector().getSubscriberGroupId();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import com.tangosol.internal.net.topic.impl.paged.model.SubscriberGroupId;

import com.tangosol.internal.net.topic.impl.paged.model.SubscriberId;
import com.tangosol.net.topic.Position;
import com.tangosol.net.topic.Subscriber;
import com.tangosol.net.topic.TopicDependencies;
Expand Down Expand Up @@ -112,19 +113,6 @@ public Position[] initialize(ConnectedSubscriber<T> subscriber, boolean fForceRe
return f_connector.initialize((ConnectedSubscriber<F>) subscriber, fForceReconnect, fReconnect, fDisconnected);
}


@Override
public void initializeSubscription(ConnectedSubscriber<T> subscriber, boolean fForceReconnect)
{
f_connector.initializeSubscription((ConnectedSubscriber<F>) subscriber, fForceReconnect);
}

@Override
public Position[] initializeSubscriptionHeads(ConnectedSubscriber<T> subscriber, boolean fReconnect, boolean fDisconnected)
{
return f_connector.initializeSubscriptionHeads((ConnectedSubscriber<F>) subscriber, fReconnect, fDisconnected);
}

@Override
public boolean ensureSubscription(ConnectedSubscriber<T> subscriber, long subscriptionId, boolean fForceReconnect)
{
Expand Down Expand Up @@ -271,6 +259,18 @@ public long getSubscriptionId()
return f_connector.getSubscriptionId();
}

@Override
public SubscriberId getSubscriberId()
{
return f_connector.getSubscriberId();
}

@Override
public SubscriberGroupId getSubscriberGroupId()
{
return f_connector.getSubscriberGroupId();
}

// ----- data members ---------------------------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.oracle.coherence.common.base.Logger;
import com.oracle.coherence.common.base.TimeHelper;

import com.oracle.coherence.common.util.Options;
import com.oracle.coherence.common.util.Sentry;

import com.tangosol.coherence.config.Config;
Expand All @@ -29,7 +30,6 @@

import com.tangosol.net.Cluster;
import com.tangosol.net.FlowControl;
import com.tangosol.net.Member;
import com.tangosol.net.Service;

import com.tangosol.net.management.MBeanHelper;
Expand Down Expand Up @@ -93,7 +93,6 @@
import java.util.function.Function;

import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* A subscriber of values from a paged topic.
Expand Down Expand Up @@ -121,57 +120,20 @@ public class NamedTopicSubscriber<V>
*/
public <T> NamedTopicSubscriber(NamedTopic<?> topic, SubscriberConnector<V> connector, Option<? super T, V>[] options)
{
this(topic, connector, null, null, options);
}

/**
* Create a {@link NamedTopicSubscriber}.
*
* @param topic the underlying {@link NamedTopic} that this subscriber is subscribed to
* @param connector the connector to connect to server side resources
* @param subscriberId the optional {@link SubscriberId} to identify this subscriber
* @param groupId the optional subscriber group id
* @param options the {@link Option}s controlling this {@link NamedTopicSubscriber}
*
* @throws NullPointerException if the {@code topic} or {@code caches} parameters are {@code null}
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public <T> NamedTopicSubscriber(NamedTopic<?> topic, SubscriberConnector<V> connector, SubscriberId subscriberId,
SubscriberGroupId groupId, Option<? super T, V>[] options)
{
f_topic = Objects.requireNonNull(topic);
f_connector = connector;
f_sTopicName = topic.getName();
f_gate = new ThreadGateLite<>();
f_gateState = new ThreadGateLite<>();
f_converter = new SerializerValueConverter<>(topic.getService().getSerializer());
OptionSet<T, V> optionSet = NamedTopicSubscriber.optionsFrom(options);

OptionSet<T, V> optionSet = Subscriber.optionsFrom(options);

if (subscriberId == null)
{
WithNotificationId withId = optionSet.get(WithNotificationId.class);
if (withId == null)
{
f_nNotificationId = System.identityHashCode(this);
}
else
{
f_nNotificationId = withId.getId();
}

Member member = topic.getService().getCluster().getLocalMember();
f_subscriberId = new SubscriberId(f_nNotificationId, member.getId(), member.getUuid());
}
else
{
f_subscriberId = subscriberId;
f_nNotificationId = subscriberId.getNotificationId();
}
f_topic = Objects.requireNonNull(topic);
f_connector = connector;
f_sTopicName = topic.getName();
f_gate = new ThreadGateLite<>();
f_gateState = new ThreadGateLite<>();
f_converter = new SerializerValueConverter<>(topic.getService().getSerializer());
f_subscriberId = connector.getSubscriberId();
f_nNotificationId = f_subscriberId.getNotificationId();

long nId = f_subscriberId.getId();

f_subscriberGroupId = groupId != null ? groupId : optionSet.getSubscriberGroupId();
f_subscriberGroupId = connector.getSubscriberGroupId();
f_fAnonymous = f_subscriberGroupId.isAnonymous();
f_fCompleteOnEmpty = optionSet.isCompleteOnEmpty();
f_key = new SubscriberInfo.Key(f_subscriberGroupId, nId);
Expand Down Expand Up @@ -557,8 +519,9 @@ public Set<Integer> getChannelSet()
{
if (m_nState == STATE_CONNECTED)
{
return IntStream.of(getChannels())
.boxed()
return Arrays.stream(m_aChannel)
.filter(TopicChannel::isOwned)
.map(Channel::getId)
.collect(Collectors.toSet());
}
}
Expand Down Expand Up @@ -2815,6 +2778,130 @@ public interface WithNotificationId<V, U>
int getId();
}

// ----- inner class: WithIdentifier ------------------------------------

/**
* An {@link Option} that provides a {@link SubscriberId} to a {@link NamedTopicSubscriber}.
*/
public interface WithSubscriberId<V, U>
extends Option<V, U>
{
/**
* Return the {@link SubscriberId} to use
*
* @param nNotificationId the notification id
*
* @return the {@link SubscriberId} to use
*/
SubscriberId getId(int nNotificationId);
}

// ----- inner class OptionSet ------------------------------------------

/**
* A holder of subscriber options.
*
* @param <V> the type of value in the underlying topic
* @param <U> the type of value the subscriber receives
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public static class OptionSet<V, U>
extends Options<Option<V, U>>
{
/**
* Create an option set.
*
* @param clsType the type of the options
* @param aOptions the array of options
*/
private OptionSet(Class<Option<V, U>> clsType, Option<V, U>[] aOptions)
{
super(clsType, aOptions);
}

/**
* Return the subscriber group name.
*
* @return the subscriber group name
*/
public Optional<String> getSubscriberGroupName()
{
Subscriber.Name nameOption = get(Subscriber.Name.class, null);
if (nameOption == null)
{
return Optional.empty();
}
String sName = nameOption.getName();
return sName == null || sName.isBlank() ? Optional.empty() : Optional.of(sName);
}

/**
* Return the subscriber group identifier.
*
* @return the subscriber group identifier
*/
public SubscriberGroupId getSubscriberGroupId()
{
return getSubscriberGroupName().map(SubscriberGroupId::withName)
.orElse(SubscriberGroupId.anonymous());
}

/**
* Return the optional {@link Filter} for the subscriber group
*
* @return the optional {@link Filter} for the subscriber group
*/
public Optional<Filter<U>> getFilter()
{
Subscriber.Filtered filtered = get(Subscriber.Filtered.class);
return filtered == null ? Optional.empty() : Optional.ofNullable(filtered.getFilter());
}

/**
* Return the optional {@link ValueExtractor} for the subscriber group
*
* @return the optional {@link ValueExtractor} for the subscriber group
*/
public Optional<ValueExtractor<U, ?>> getExtractor()
{
Subscriber.Convert convert = get(Subscriber.Convert.class);
return convert == null ? Optional.empty() : Optional.ofNullable(convert.getExtractor());
}

/**
* Return {@code true} if the subscriber should complete receive requests when empty.
*
* @return {@code true} if the subscriber should complete receive requests when empty
*/
public boolean isCompleteOnEmpty()
{
return contains(Subscriber.CompleteOnEmpty.class);
}

/**
* Return an array of {@link ChannelOwnershipListener} instances.
*
* @return an array of {@link ChannelOwnershipListener} instances
*/
public ChannelOwnershipListener[] getChannelListeners()
{
ChannelOwnershipListeners<V> listeners = get(ChannelOwnershipListeners.class);
if (listeners == null)
{
return new ChannelOwnershipListener[0];
}
List<ChannelOwnershipListener> list = listeners.getListeners();
return list.toArray(ChannelOwnershipListener[]::new);
}
}

@SuppressWarnings({"unchecked", "rawtypes"})
public static <V, U> OptionSet<V, U> optionsFrom(Option<? super V, U>[] options)
{
Class<?> clsType = Option.class;
return new OptionSet(clsType, options);
}

// ----- inner class: CommittableElement --------------------------------

/**
Expand Down Expand Up @@ -3756,6 +3843,7 @@ public void onEvent(SubscriberConnector.SubscriberEvent evt)
disconnectInternal(false);
break;
case ChannelPopulated:
// must use the channel executor
CompletableFuture.runAsync(() -> onChannelPopulatedNotification(evt.getPopulatedChannels()), f_executorChannels);
break;
case Destroyed:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,17 @@ public SortedMap<Integer, Set<SubscriberId>> cleanup(SortedMap<Long, SubscriberI
SubscriberId subscriberId = entry.getValue();
UUID uuid = subscriberId.getUID();
int nMemberId = subscriberId.getMemberId();
if (uuid == null && !setMemberID.contains(nMemberId))
{
mapRemoved.compute(nMemberId, (key, set) -> ensureSet(nMemberId, subscriberId, set));
}
else if (!setMemberUID.contains(uuid))

if (nMemberId != 0) // zero member id is an Extend client so we cannot do anything here
{
mapRemoved.compute(nMemberId, (key, set) -> ensureSet(nMemberId, subscriberId, set));
if (uuid == null && !setMemberID.contains(nMemberId))
{
mapRemoved.compute(nMemberId, (key, set) -> ensureSet(nMemberId, subscriberId, set));
}
else if (!setMemberUID.contains(uuid))
{
mapRemoved.compute(nMemberId, (key, set) -> ensureSet(nMemberId, subscriberId, set));
}
}
}

Expand Down
Loading

0 comments on commit 61c79db

Please sign in to comment.