Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unregister topic listener on region shutdown #164

Merged
merged 2 commits into from
Jun 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ public void start(final SessionFactoryOptions options, final Properties properti

@Override
public void stop() {
cleanupService.stop();
if (instanceLoader != null) {
log.info("Shutting down " + getClass().getSimpleName());
instanceLoader.unloadInstance();
instance = null;
instanceLoader = null;
}
cleanupService.stop();
}

public HazelcastInstance getHazelcastInstance() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public interface RegionCache {

void clear();

default void destroy() {
}

long size();

long getSizeInMemory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import com.hazelcast.instance.impl.OutOfMemoryErrorDispatcher;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
Expand All @@ -31,19 +33,23 @@ public final class CleanupService {
private final Duration fixedDelay;
private final String name;
private final ScheduledExecutorService executor;
private final List<LocalRegionCache> localRegionCaches;

public CleanupService(final String name, final Duration fixedDelay) {
this.fixedDelay = fixedDelay;
this.name = name;
executor = Executors.newSingleThreadScheduledExecutor(new CleanupThreadFactory());
localRegionCaches = new ArrayList<>();
}

public void registerCache(final LocalRegionCache cache) {
executor.scheduleWithFixedDelay(cache::cleanup, fixedDelay.toMillis(), fixedDelay.toMillis(), TimeUnit.MILLISECONDS);
localRegionCaches.add(cache);
}

public void stop() {
executor.shutdownNow();
localRegionCaches.forEach(LocalRegionCache::destroy);
}

private class CleanupThreadFactory implements ThreadFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -55,6 +56,7 @@ public class LocalRegionCache implements RegionCache {

protected final HazelcastInstance hazelcastInstance;
protected final ITopic<Object> topic;
protected final UUID listenerRegistrationId;
protected final MessageListener<Object> messageListener;
protected final ConcurrentMap<Object, Expirable> cache;
protected final Comparator versionComparator;
Expand Down Expand Up @@ -120,9 +122,10 @@ public LocalRegionCache(final String name, final HazelcastInstance hazelcastInst
messageListener = createMessageListener();
if (withTopic && hazelcastInstance != null) {
topic = hazelcastInstance.getTopic(name);
topic.addMessageListener(messageListener);
listenerRegistrationId = topic.addMessageListener(messageListener);
} else {
topic = null;
listenerRegistrationId = null;
}
this.evictionConfig = evictionConfig == null ? EvictionConfig.create(config) : evictionConfig;
}
Expand Down Expand Up @@ -293,6 +296,13 @@ public void clear() {
maybeNotifyTopic(null, null, null);
}

@Override
public void destroy() {
if (topic != null && listenerRegistrationId != null) {
topic.removeMessageListener(listenerRegistrationId);
}
}

@Override
public long size() {
return cache.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ protected AbstractHazelcastRegion(final HazelcastInstance instance, final String

@Override
public void destroy() throws CacheException {
// Destroy of the region should not propagate
// to other nodes of cluster.
// Do nothing on destroy.
this.getCache().destroy();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
package com.hazelcast.hibernate;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.hibernate.entity.DummyEntity;
import com.hazelcast.hibernate.instance.HazelcastAccessor;
import com.hazelcast.instance.impl.TestUtil;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.SlowTest;
import com.hazelcast.topic.impl.TopicService;
import org.hibernate.Session;
import org.hibernate.Transaction;
import org.hibernate.cfg.Environment;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.function.Function;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
Expand All @@ -20,6 +27,25 @@
@Category(SlowTest.class)
public class LocalRegionFactorySlowTest extends HibernateSlowTestSupport {

@Test
public void cleanUpRegisteredTopicOnRegionShutdown() {
HazelcastInstance sf1Instance = HazelcastAccessor.getHazelcastInstance(sf);
List<String> regions = new ArrayList<>(Arrays.asList(sf.getStatistics().getSecondLevelCacheRegionNames()));
// 'Query' cache is not subscribed to a topic in 5 and 52.
regions.removeIf(n -> n.contains("Query"));

Function<String, Integer> getRegistrationsCount = r -> TestUtil.getNode(sf1Instance).getNodeEngine()
.getEventService().getRegistrations(TopicService.SERVICE_NAME, r).size();

regions.forEach(r ->
assertEquals(r,2L, getRegistrationsCount.apply(r).longValue()));

sf2.close();

regions.forEach(r ->
assertEquals(r,1L, getRegistrationsCount.apply(r).longValue()));
}

@Test
public void test_query_with_non_mock_network() {
final int entityCount = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ public void start(final SessionFactoryOptions options, final Properties properti

@Override
public void stop() {
cleanupService.stop();
if (instanceLoader != null) {
log.info("Shutting down " + getClass().getSimpleName());
instanceLoader.unloadInstance();
instance = null;
instanceLoader = null;
}
cleanupService.stop();
}

public HazelcastInstance getHazelcastInstance() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public interface RegionCache {

void clear();

default void destroy() {
}

long size();

long getSizeInMemory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import com.hazelcast.instance.impl.OutOfMemoryErrorDispatcher;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
Expand All @@ -31,19 +33,23 @@ public final class CleanupService {
private final Duration fixedDelay;
private final String name;
private final ScheduledExecutorService executor;
private final List<LocalRegionCache> localRegionCaches;

public CleanupService(final String name, final Duration fixedDelay) {
this.fixedDelay = fixedDelay;
this.name = name;
executor = Executors.newSingleThreadScheduledExecutor(new CleanupThreadFactory());
localRegionCaches = new ArrayList<>();
}

public void registerCache(final LocalRegionCache cache) {
executor.scheduleWithFixedDelay(cache::cleanup, fixedDelay.toMillis(), fixedDelay.toMillis(), TimeUnit.MILLISECONDS);
localRegionCaches.add(cache);
}

public void stop() {
executor.shutdownNow();
localRegionCaches.forEach(LocalRegionCache::destroy);
}

private class CleanupThreadFactory implements ThreadFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -55,6 +56,7 @@ public class LocalRegionCache implements RegionCache {

protected final HazelcastInstance hazelcastInstance;
protected final ITopic<Object> topic;
protected final UUID listenerRegistrationId;
protected final MessageListener<Object> messageListener;
protected final ConcurrentMap<Object, Expirable> cache;
protected final Comparator versionComparator;
Expand Down Expand Up @@ -120,9 +122,10 @@ public LocalRegionCache(final String name, final HazelcastInstance hazelcastInst
messageListener = createMessageListener();
if (withTopic && hazelcastInstance != null) {
topic = hazelcastInstance.getTopic(name);
topic.addMessageListener(messageListener);
listenerRegistrationId = topic.addMessageListener(messageListener);
} else {
topic = null;
listenerRegistrationId = null;
}

this.evictionConfig = evictionConfig == null ? EvictionConfig.create(config) : evictionConfig;
Expand Down Expand Up @@ -294,6 +297,13 @@ public void clear() {
maybeNotifyTopic(null, null, null);
}

@Override
public void destroy() {
if (topic != null && listenerRegistrationId != null) {
topic.removeMessageListener(listenerRegistrationId);
}
}

@Override
public long size() {
return cache.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ protected AbstractHazelcastRegion(final HazelcastInstance instance, final String
}

public void destroy() throws CacheException {
// Destroy of the region should not propagate
// to other nodes of cluster.
// Do nothing on destroy.
this.getCache().destroy();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
package com.hazelcast.hibernate;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.hibernate.entity.DummyEntity;
import com.hazelcast.hibernate.instance.HazelcastAccessor;
import com.hazelcast.instance.impl.TestUtil;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.SlowTest;
import com.hazelcast.topic.impl.TopicService;
import org.hibernate.Session;
import org.hibernate.Transaction;
import org.hibernate.cfg.Environment;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.function.Function;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
Expand All @@ -20,6 +27,25 @@
@Category(SlowTest.class)
public class LocalRegionFactorySlowTest extends HibernateSlowTestSupport {

@Test
public void cleanUpRegisteredTopicOnRegionShutdown() {
HazelcastInstance sf1Instance = HazelcastAccessor.getHazelcastInstance(sf);
List<String> regions = new ArrayList<>(Arrays.asList(sf.getStatistics().getSecondLevelCacheRegionNames()));
// 'Query' cache is not subscribed to a topic in 5 and 52.
regions.removeIf(n -> n.contains("Query"));

Function<String, Integer> getRegistrationsCount = r -> TestUtil.getNode(sf1Instance).getNodeEngine()
.getEventService().getRegistrations(TopicService.SERVICE_NAME, r).size();

regions.forEach(r ->
assertEquals(r,2L, getRegistrationsCount.apply(r).longValue()));

sf2.close();

regions.forEach(r ->
assertEquals(r,1L, getRegistrationsCount.apply(r).longValue()));
}

@Test
public void test_query_with_non_mock_network() {
final int entityCount = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,13 @@ protected void prepareForUse(final SessionFactoryOptions settings, final Map con
@SuppressWarnings("Duplicates")
@Override
protected void releaseFromUse() {
cleanupService.stop();
if (instanceLoader != null) {
log.info("Shutting down " + getClass().getSimpleName());
instanceLoader.unloadInstance();
instance = null;
instanceLoader = null;
}
cleanupService.stop();
}

private Properties toProperties(final Map configValues) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ protected RegionCache createTimestampsRegionCache(final String unqualifiedRegion
sessionFactory.getSessionFactoryOptions()
);

return new TimestampsRegionCache(this, qualifiedRegionName, instance);
TimestampsRegionCache timestampsRegionCache = new TimestampsRegionCache(this, qualifiedRegionName, instance);
cleanupService.registerCache(timestampsRegionCache);
enozcan marked this conversation as resolved.
Show resolved Hide resolved
return timestampsRegionCache;
}

public long nextTimestamp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void putIntoCache(final Object key, final Object value, final SharedSessi

@Override
public void release() {
// no-op
delegate.destroy();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import com.hazelcast.instance.impl.OutOfMemoryErrorDispatcher;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
Expand All @@ -31,19 +33,23 @@ public final class CleanupService {
private final Duration fixedDelay;
private final String name;
private final ScheduledExecutorService executor;
private final List<LocalRegionCache> localRegionCaches;

public CleanupService(final String name, final Duration fixedDelay) {
this.fixedDelay = fixedDelay;
this.name = name;
executor = Executors.newSingleThreadScheduledExecutor(new CleanupThreadFactory());
localRegionCaches = new ArrayList<>();
}

public void registerCache(final LocalRegionCache cache) {
executor.scheduleWithFixedDelay(cache::cleanup, fixedDelay.toMillis(), fixedDelay.toMillis(), TimeUnit.MILLISECONDS);
localRegionCaches.add(cache);
}

public void stop() {
executor.shutdownNow();
localRegionCaches.forEach(LocalRegionCache::destroy);
}

private class CleanupThreadFactory implements ThreadFactory {
Expand Down
Loading