Skip to content
This repository has been archived by the owner on Feb 15, 2022. It is now read-only.

Commit

Permalink
Merge pull request #28 from opendistro-for-elasticsearch/service-map-…
Browse files Browse the repository at this point in the history
…stateful

Initial commit of service map stateful processor plugin
  • Loading branch information
AustinTag authored Sep 22, 2020
2 parents 86de643 + 094a27e commit 20047d5
Show file tree
Hide file tree
Showing 10 changed files with 788 additions and 121 deletions.
3 changes: 2 additions & 1 deletion dependency-versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ ext.versionMap = [
reflections : '0.9.11',
slf4j_api : '1.7.30',
slf4j_log4j12 : '1.7.30',
validation_api : '2.0.1.Final'
validation_api : '2.0.1.Final',
opentelemetry_proto : '0.6.0'
]
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ include 'situp-plugins:common'
include 'situp-plugins:apmtracesource'
include 'situp-plugins:elasticsearch'
include 'situp-plugins:lmdb-processor-state'
include 'situp-plugins:service-map-stateful'
Original file line number Diff line number Diff line change
Expand Up @@ -9,35 +9,40 @@
* @param <T>
* Type parameter for the value type. Keys will be Strings.
*/
public interface ProcessorState<T> {
public interface ProcessorState<K, V> {

/**
* Puts a key value pair in the processor state
* @param key Key to put in the state
* @param value Value to map to the key
*/
void put(String key, T value);
void put(K key, V value);

/**
* Gets the value in the processor state for the given key
* @param key Key to look up value for
* @return Value for key, if it exists. Otherwise null.
*/
T get(String key);
V get(K key);

/**
* Gets all the data in the processor state
* @return All the data in the processor state in the form of a Map
*/
Map<String, T> getAll();
Map<K, V> getAll();

/**
* Iterate over the processor state with a bifunction
* @param fn BiFunction with which to iterate over the processor state
* @param <R> Type parameter for return value of BiFunction
* @return Result of iteration as a list of objects of type R
*/
public<R> List<R> iterate(BiFunction<String, T, R> fn);
public<R> List<R> iterate(BiFunction<K, V, R> fn);

/**
* @return Size of the processor state, in terms of number of elements stored.
*/
public long size();

/**
* Clears the data in the processor state
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

import com.amazon.situp.processor.state.ProcessorState;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -17,45 +19,64 @@ public abstract class ProcessorStateTest {

protected static final Random random = new Random();

protected ProcessorState<DataClass> processorState;
protected ProcessorState<byte[], DataClass> processorState;

@Before
public abstract void setProcessorState() throws Exception;

@Test
public void testSize() {
//Put two random Pojos in the processor state
final DataClass data1 = new DataClass(UUID.randomUUID().toString(), random.nextInt());
final byte[] key1 = UUID.randomUUID().toString().getBytes();

final DataClass data2 = new DataClass(UUID.randomUUID().toString(), random.nextInt());
final byte[] key2 = UUID.randomUUID().toString().getBytes();

processorState.put(key1, data1);
processorState.put(key2, data2);

Assert.assertEquals(2, processorState.size());
}

@Test
public void testPutAndGet() {
//Put two random Pojos in the processor state
final DataClass data1 = new DataClass(UUID.randomUUID().toString(), random.nextInt());
final String key1 = UUID.randomUUID().toString();
final byte[] key1 = UUID.randomUUID().toString().getBytes();

final DataClass data2 = new DataClass(UUID.randomUUID().toString(), random.nextInt());
final String key2 = UUID.randomUUID().toString();
final byte[] key2 = UUID.randomUUID().toString().getBytes();

processorState.put(key1, data1);
processorState.put(key2, data2);

//Read them and assert that they are correctly read, and assert incorrect key gives back null value
Assert.assertEquals(data1, processorState.get(key1));
Assert.assertEquals(data2, processorState.get(key2));
Assert.assertNull(processorState.get(UUID.randomUUID().toString()));
Assert.assertNull(processorState.get(UUID.randomUUID().toString().getBytes()));
}

@Test
public void testPutAndGetAll() {
//Put two random Pojos in the processor state
final DataClass data1 = new DataClass(UUID.randomUUID().toString(), random.nextInt());
final String key1 = UUID.randomUUID().toString();
final byte[] key1 = UUID.randomUUID().toString().getBytes();

final DataClass data2 = new DataClass(UUID.randomUUID().toString(), random.nextInt());
final String key2 = UUID.randomUUID().toString();
final byte[] key2 = UUID.randomUUID().toString().getBytes();

processorState.put(key1, data1);
processorState.put(key2, data2);

final Map<String, DataClass> stateMap = processorState.getAll();
//Using byte array as key, need to translate to String key for value comparision (instead of reference)
final Map<String, DataClass> stateMap = processorState.getAll()
.entrySet()
.stream()
.collect(Collectors.toMap( dataClassEntry -> new String(dataClassEntry.getKey()), dataClassEntry -> dataClassEntry.getValue()));
Assert.assertEquals(2, stateMap.size());
Assert.assertEquals(data1, stateMap.get(key1));
Assert.assertEquals(data2, stateMap.get(key2));
Assert.assertEquals(data1, stateMap.get(new String(key1)));
Assert.assertEquals(data2, stateMap.get(new String(key2)));
}

@After
Expand All @@ -67,17 +88,17 @@ public void teardown() {
@Test
public void testIterate() {
final DataClass data1 = new DataClass(UUID.randomUUID().toString(), random.nextInt());
final String key1 = UUID.randomUUID().toString();
final byte[] key1 = UUID.randomUUID().toString().getBytes();

final DataClass data2 = new DataClass(UUID.randomUUID().toString(), random.nextInt());
final String key2 = UUID.randomUUID().toString();
final byte[] key2 = UUID.randomUUID().toString().getBytes();

processorState.put(key1, data1);
processorState.put(key2, data2);

final Collection<String> iterateResult = processorState.iterate(new BiFunction<String, DataClass, String>() {
final Collection<String> iterateResult = processorState.iterate(new BiFunction<byte[], DataClass, String>() {
@Override
public String apply(String s, DataClass dataClass) {
public String apply(byte[] s, DataClass dataClass) {
processorState.get(s);
return dataClass.stringVal;
}
Expand Down
Loading

0 comments on commit 20047d5

Please sign in to comment.