Skip to content

Commit

Permalink
Fix count aggregation exemplar data (#4341)
Browse files Browse the repository at this point in the history
* Fix count aggregation exemplar data

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed failing tests

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed failing tests

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
3 people authored Apr 1, 2024
1 parent 36daff7 commit f7191c9
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.opentelemetry.proto.metrics.v1.AggregationTemporality;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
Expand All @@ -37,6 +36,7 @@
@DataPrepperPlugin(name = "count", pluginType = AggregateAction.class, pluginConfigurationType = CountAggregateActionConfig.class)
public class CountAggregateAction implements AggregateAction {
private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
private static final String exemplarKey = "__exemplar";
static final String EVENT_TYPE = "event";
static final String SUM_METRIC_NAME = "count";
static final String SUM_METRIC_DESCRIPTION = "Number of events";
Expand All @@ -46,7 +46,6 @@ public class CountAggregateAction implements AggregateAction {
public final String startTimeKey;
public final String outputFormat;
private long startTimeNanos;
private List<Exemplar> exemplarList;

@DataPrepperPluginConstructor
public CountAggregateAction(final CountAggregateActionConfig countAggregateActionConfig) {
Expand Down Expand Up @@ -86,8 +85,7 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct
groupState.put(startTimeKey, Instant.now());
groupState.putAll(aggregateActionInput.getIdentificationKeys());
groupState.put(countKey, 1);
exemplarList = new ArrayList<>();
exemplarList.add(createExemplar(event));
groupState.put(exemplarKey, createExemplar(event));
} else {
Integer v = (Integer)groupState.get(countKey) + 1;
groupState.put(countKey, v);
Expand All @@ -108,6 +106,8 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
.build();
} else {
Integer countValue = (Integer)groupState.get(countKey);
Exemplar exemplar = (Exemplar)groupState.get(exemplarKey);
groupState.remove(exemplarKey);
groupState.remove(countKey);
groupState.remove(startTimeKey);
long currentTimeNanos = getTimeNanos(Instant.now());
Expand All @@ -123,7 +123,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
.withUnit(SUM_METRIC_UNIT)
.withAggregationTemporality(AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA.name())
.withValue((double)countValue)
.withExemplars(exemplarList)
.withExemplars(List.of(exemplar))
.withAttributes(attr)
.build(false);
event = (Event)sum;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.metric.JacksonMetric;
import org.opensearch.dataprepper.model.metric.Exemplar;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand All @@ -20,6 +19,7 @@
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionTestUtils;

import org.mockito.junit.jupiter.MockitoExtension;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -78,22 +78,34 @@ void testCountAggregate(int testCount) throws NoSuchFieldException, IllegalAcces
void testCountAggregateOTelFormat(int testCount) {
CountAggregateActionConfig countAggregateActionConfig = new CountAggregateActionConfig();
countAggregateAction = createObjectUnderTest(countAggregateActionConfig);
final String key = UUID.randomUUID().toString();
final String value = UUID.randomUUID().toString();
final String dataKey = UUID.randomUUID().toString();
Map<Object, Object> eventMap = Collections.singletonMap(key, value);
final String key1 = "key-"+UUID.randomUUID().toString();
final String value1 = UUID.randomUUID().toString();
final String dataKey1 = "datakey-"+UUID.randomUUID().toString();
final String key2 = "key-"+UUID.randomUUID().toString();
final String value2 = UUID.randomUUID().toString();
final String dataKey2 = "datakey-"+UUID.randomUUID().toString();
Map<Object, Object> eventMap = Collections.singletonMap(key1, value1);
Event testEvent = JacksonEvent.builder()
.withEventType("event")
.withData(eventMap)
.build();
final AggregateActionInput aggregateActionInput = new AggregateActionTestUtils.TestAggregateActionInput(eventMap);
Map<Object, Object> eventMap2 = Collections.singletonMap(key2, value2);
JacksonEvent testEvent2 = JacksonEvent.builder()
.withEventType("event")
.withData(eventMap2)
.build();
AggregateActionInput aggregateActionInput = new AggregateActionTestUtils.TestAggregateActionInput(eventMap);
AggregateActionInput aggregateActionInput2 = new AggregateActionTestUtils.TestAggregateActionInput(eventMap2);
for (int i = 0; i < testCount; i++) {
testEvent.put(dataKey, UUID.randomUUID().toString());
final AggregateActionResponse aggregateActionResponse = countAggregateAction.handleEvent(testEvent, aggregateActionInput);
testEvent.put(dataKey1, UUID.randomUUID().toString());
testEvent2.put(dataKey2, UUID.randomUUID().toString());
AggregateActionResponse aggregateActionResponse = countAggregateAction.handleEvent(testEvent, aggregateActionInput);
assertThat(aggregateActionResponse.getEvent(), equalTo(null));
aggregateActionResponse = countAggregateAction.handleEvent(testEvent2, aggregateActionInput2);
assertThat(aggregateActionResponse.getEvent(), equalTo(null));
}

final AggregateActionOutput actionOutput = countAggregateAction.concludeGroup(aggregateActionInput);
AggregateActionOutput actionOutput = countAggregateAction.concludeGroup(aggregateActionInput);
final List<Event> result = actionOutput.getEvents();
assertThat(result.size(), equalTo(1));
Map<String, Object> expectedEventMap = new HashMap<>();
Expand All @@ -109,7 +121,22 @@ void testCountAggregateOTelFormat(int testCount) {
assertThat(metric.toJsonString().indexOf("attributes"), not(-1));
assertThat(result.get(0).toMap(), hasKey("startTime"));
assertThat(result.get(0).toMap(), hasKey("time"));
List<Exemplar> exemplars = (List <Exemplar>)result.get(0).toMap().get("exemplars");
List<Map<String, Object>> exemplars = (List <Map<String, Object>>)result.get(0).toMap().get("exemplars");
assertThat(exemplars.size(), equalTo(1));
Map<String, Object> exemplar = exemplars.get(0);
Map<String, Object> attributes = (Map<String, Object>)exemplar.get("attributes");
assertThat(attributes.get(key1), equalTo(value1));
assertTrue(attributes.containsKey(dataKey1));

actionOutput = countAggregateAction.concludeGroup(aggregateActionInput2);
final List<Event> result2 = actionOutput.getEvents();
assertThat(result2.size(), equalTo(1));

exemplars = (List <Map<String, Object>>)result2.get(0).toMap().get("exemplars");
assertThat(exemplars.size(), equalTo(1));
exemplar = exemplars.get(0);
attributes = (Map<String, Object>)exemplar.get("attributes");
assertThat(attributes.get(key2), equalTo(value2));
assertTrue(attributes.containsKey(dataKey2));
}
}

0 comments on commit f7191c9

Please sign in to comment.