Skip to content

Commit

Permalink
NIFI-14043 Add support for keySet in HazelcastMapCacheClient (#9553)
Browse files Browse the repository at this point in the history
Signed-off-by: David Handermann <[email protected]>
  • Loading branch information
EndzeitBegins authored Nov 25, 2024
1 parent cfa495b commit 2199d64
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.nifi.hazelcast.services.cache;

import java.util.Set;
import java.util.function.Predicate;

/**
Expand All @@ -32,7 +33,14 @@ public interface HazelcastCache {
String name();

/**
* Returns the value of the cache entry defined by the the key.
* Returns a set of all keys currently in the cache
*
* @return The Set of all keys currently in the cache
*/
Set<String> keySet();

/**
* Returns the value of the cache entry defined by the key.
*
* @param key Key of the entry, must not be null.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public String name() {
return storage.getName();
}

@Override
public Set<String> keySet() {
return storage.keySet();
}

@Override
public byte[] get(final String key) {
return storage.get(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,23 @@

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.regex.Pattern;

/**
* An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache.
*
* Note: By design, the client should not directly depend on Hazelcast specific classes to allow easy version and implementation changes.
*/
@Tags({ "hazelcast", "cache", "map"})
@Tags({"hazelcast", "cache", "map"})
@CapabilityDescription("An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. This service relies on " +
"an other controller service, manages the actual Hazelcast calls, set in Hazelcast Cache Manager.")
public class HazelcastMapCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient<Long> {
Expand Down Expand Up @@ -116,9 +116,18 @@ public void onDisabled() {
}
}

@Override
public <K> Set<K> keySet(Deserializer<K> keyDeserializer) throws IOException {
final HashSet<K> keySet = new HashSet<>();
for (String key : cache.keySet()) {
keySet.add(parseCacheEntryKey(key, keyDeserializer));
}
return keySet;
}

@Override
public <K, V> AtomicCacheEntry<K, V, Long> fetch(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
final byte[] result = cache.get(getCacheEntryKey(key, keySerializer));
final byte[] result = cache.get(serializeCacheEntryKey(key, keySerializer));
return (result == null) ? null : new AtomicCacheEntry<>(key, parsePayload(valueDeserializer, result), parseRevision(result));
}

Expand All @@ -128,18 +137,18 @@ public <K, V> boolean replace(final AtomicCacheEntry<K, V, Long> entry, final Se
return false;
}

final String key = getCacheEntryKey(entry.getKey(), keySerializer);
final String key = serializeCacheEntryKey(entry.getKey(), keySerializer);

try (final HazelcastCache.HazelcastCacheEntryLock lock = cache.acquireLock(key)) {
final byte[] oldValue = cache.get(key);

if (oldValue == null && (!entry.getRevision().isPresent() || entry.getRevision().get() < STARTING_REVISION)) {
cache.put(key, serialize(entry.getValue(), valueSerializer, STARTING_REVISION));
if (oldValue == null && (entry.getRevision().isEmpty() || entry.getRevision().get() < STARTING_REVISION)) {
cache.put(key, serializeCacheEntryValue(entry.getValue(), valueSerializer, STARTING_REVISION));
getLogger().debug("Entry with key {} was added during replace", key);
return true;
} else if (oldValue != null && Objects.equals(entry.getRevision().get(), parseRevision(oldValue))) {
final long newRevision = entry.getRevision().get() + 1;
cache.put(key, serialize(entry.getValue(), valueSerializer, newRevision));
cache.put(key, serializeCacheEntryValue(entry.getValue(), valueSerializer, newRevision));
getLogger().debug("Entry with key {} was updated during replace, with revision {}", key, newRevision);
return true;
}
Expand All @@ -150,49 +159,36 @@ public <K, V> boolean replace(final AtomicCacheEntry<K, V, Long> entry, final Se

@Override
public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
return cache.putIfAbsent(getCacheEntryKey(key, keySerializer), serialize(value, valueSerializer, STARTING_REVISION)) == null;
return cache.putIfAbsent(serializeCacheEntryKey(key, keySerializer), serializeCacheEntryValue(value, valueSerializer, STARTING_REVISION)) == null;
}

@Override
public <K, V> V getAndPutIfAbsent(
final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final Deserializer<V> valueDeserializer
) throws IOException {
final byte[] result = cache.putIfAbsent(getCacheEntryKey(key, keySerializer), serialize(value, valueSerializer, STARTING_REVISION));
final byte[] result = cache.putIfAbsent(serializeCacheEntryKey(key, keySerializer), serializeCacheEntryValue(value, valueSerializer, STARTING_REVISION));
return (result == null) ? null : parsePayload(valueDeserializer, result);
}

@Override
public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
return cache.contains(getCacheEntryKey(key, keySerializer));
return cache.contains(serializeCacheEntryKey(key, keySerializer));
}

@Override
public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
cache.put(getCacheEntryKey(key, keySerializer), serialize(value, valueSerializer, STARTING_REVISION));
cache.put(serializeCacheEntryKey(key, keySerializer), serializeCacheEntryValue(value, valueSerializer, STARTING_REVISION));
}

@Override
public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
final byte[] result = cache.get(getCacheEntryKey(key, keySerializer));
final byte[] result = cache.get(serializeCacheEntryKey(key, keySerializer));
return result == null ? null : parsePayload(valueDeserializer, result);
}

@Override
public <K> boolean remove(final K key, final Serializer<K> keySerializer) throws IOException {
return cache.remove(getCacheEntryKey(key, keySerializer));
}

private static class RegexPredicate implements Predicate<String>, Serializable {
private final Pattern pattern;

private RegexPredicate(final String regex) {
this.pattern = Pattern.compile(regex);
}

@Override
public boolean test(final String string) {
return pattern.matcher(string).matches();
}
return cache.remove(serializeCacheEntryKey(key, keySerializer));
}

@Override
Expand All @@ -205,23 +201,15 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
}

private static long parseRevision(final byte[] value) {
return ByteBuffer.wrap(Arrays.copyOfRange(value, 0, Long.BYTES)).getLong();
}

private static <V> V parsePayload(final Deserializer<V> deserializer, final byte[] value) throws IOException {
return deserializer.deserialize(Arrays.copyOfRange(value, Long.BYTES, value.length));
}

private <S> String getCacheEntryKey(final S key, final Serializer<S> serializer) throws IOException {
private <S> String serializeCacheEntryKey(final S key, final Serializer<S> serializer) throws IOException {
final String result;

if (key instanceof String) {
result = (String) key;
} else {
final ByteArrayOutputStream stream = new ByteArrayOutputStream();
serializer.serialize(key, stream);
result = stream.toString("UTF-8");
result = stream.toString(StandardCharsets.UTF_8);
}

if (result.isEmpty()) {
Expand All @@ -231,27 +219,39 @@ private <S> String getCacheEntryKey(final S key, final Serializer<S> serializer)
return result;
}

private static <K> K parseCacheEntryKey(final String key, final Deserializer<K> keyDeserializer) throws IOException {
final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);

return keyDeserializer.deserialize(keyBytes);
}

/**
* Serializes a value using the given serializer. The first eight bytes of the array contains the revision.
* The rest holds the actual serialized value.
*
* @param value The value to serialize.
* @param value The value to serialize.
* @param serializer The serializer to use in order to serialize the incoming value.
* @param version The version of the entry.
* @param <S> The type of the value to be serialized.
*
* @param version The version of the entry.
* @param <S> The type of the value to be serialized.
* @return Byte array containing both version and value of the cache entry.
*
* @throws IOException In case of any issue during working with intermediate byte stream.
*/
private <S> byte[] serialize(final S value, final Serializer<S> serializer, final long version) throws IOException {
private <S> byte[] serializeCacheEntryValue(final S value, final Serializer<S> serializer, final long version) throws IOException {
final ByteArrayOutputStream stream = new ByteArrayOutputStream();

stream.write(getVersionByteArray(version));
serializer.serialize(value, stream);
return stream.toByteArray();
}

private static long parseRevision(final byte[] value) {
return ByteBuffer.wrap(Arrays.copyOfRange(value, 0, Long.BYTES)).getLong();
}

private static <V> V parsePayload(final Deserializer<V> deserializer, final byte[] value) throws IOException {
return deserializer.deserialize(Arrays.copyOfRange(value, Long.BYTES, value.length));
}

private byte[] getVersionByteArray(final long version) {
return ByteBuffer.allocate(Long.BYTES).putLong(version).array();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ public String name() {
return name;
}

@Override
public Set<String> keySet() {
return values.keySet();
}

@Override
public byte[] get(final String key) {
return values.get(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -50,6 +51,20 @@ public void setUp() {
testSubject = new IMapBasedHazelcastCache(storage, TTL);
}

@Test
void testKeySet() {
// given
final Set<String> keys = Set.of(KEY, KEY_2);
Mockito.when(storage.keySet()).thenReturn(keys);

// when
final Set<String> result = testSubject.keySet();

// then
Mockito.verify(storage).keySet();
assertEquals(keys, result);
}

@Test
public void testGet() {
// given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import org.mockito.junit.jupiter.MockitoExtension;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -291,6 +293,40 @@ public void testSerialization() throws Exception {
assertEquals(value, result);
}

@Test
void testKeySetOnEmptyCache() throws IOException {
thenKeySetEquals(Set.of(), SERIALIZER);
}

@Test
void testKeySetOnNonEmptyCache() throws IOException {
//when
whenPutEntry("key1", "1-value");
whenPutEntry("key2", "2-value");
whenPutEntry("key3", "3-value");
whenPutEntry("key4", "4-value");

// then
thenKeySetEquals(Set.of("key1", "key2", "key3", "key4"), SERIALIZER);
}

@Test
void testKeyWithNonStringKeys() throws IOException {
// given
final Serializer<Integer> nonStringKeySerializer =
(value, output) -> output.write(ByteBuffer.allocate(4).putInt(value).array());
final Deserializer<Integer> nonStringKeyDeserializer = input -> ByteBuffer.wrap(input).getInt();

// when
testSubject.put(1, "1-value", nonStringKeySerializer, SERIALIZER);
testSubject.put(2, "2-value", nonStringKeySerializer, SERIALIZER);
testSubject.put(3, "3-value", nonStringKeySerializer, SERIALIZER);
testSubject.put(4, "4-value", nonStringKeySerializer, SERIALIZER);

// then
thenKeySetEquals(Set.of(1, 2, 3, 4), nonStringKeyDeserializer);
}

private void whenRemoveEntryIsSuccessful() throws IOException {
assertTrue(testSubject.remove(KEY, SERIALIZER));
}
Expand Down Expand Up @@ -329,6 +365,10 @@ private void whenReplaceEntryIsFailed(final Long version, final String newValue)
assertFalse(testSubject.replace(cacheEntry, SERIALIZER, SERIALIZER));
}

private <K> void thenKeySetEquals(final Set<K> keys, final Deserializer<K> keyDeserializer) throws IOException {
assertEquals(testSubject.keySet(keyDeserializer), keys);
}

private void thenEntryIsNotInCache(final String key) throws IOException {
assertFalse(testSubject.containsKey(key, SERIALIZER));
}
Expand Down

0 comments on commit 2199d64

Please sign in to comment.