Skip to content

Commit

Permalink
Fix Lambda sink DLQ support (opensearch-project#5207)
Browse files Browse the repository at this point in the history
Signed-off-by: Kondaka <[email protected]>
  • Loading branch information
kkondaka authored Nov 20, 2024
1 parent dee5255 commit 7546062
Show file tree
Hide file tree
Showing 7 changed files with 533 additions and 239 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,11 @@ public void setup() {
when(pluginSetting.getName()).thenReturn("name");
lambdaSinkConfig = mock(LambdaSinkConfig.class);
when(lambdaSinkConfig.getFunctionName()).thenReturn(functionName);
//when(lambdaSinkConfig.getMaxConnectionRetries()).thenReturn(3);
when(lambdaSinkConfig.getDlqPluginSetting()).thenReturn(null);

InvocationType sinkInvocationType = mock(InvocationType.class);
when(sinkInvocationType.getAwsLambdaValue()).thenReturn(InvocationType.EVENT.getAwsLambdaValue());
when(lambdaSinkConfig.getInvocationType()).thenReturn(invocationType);
//when(lambdaSinkConfig.getConnectionTimeout()).thenReturn(DEFAULT_CONNECTION_TIMEOUT);
when(lambdaSinkConfig.getBatchOptions()).thenReturn(batchOptions);
when(lambdaSinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,18 @@
import org.opensearch.dataprepper.plugins.lambda.common.config.ClientOptions;
import org.opensearch.dataprepper.plugins.lambda.sink.dlq.DlqPushHandler;
import org.opensearch.dataprepper.plugins.lambda.sink.dlq.LambdaSinkFailedDlqData;
import org.opensearch.dataprepper.model.failures.DlqObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.lambda.LambdaAsyncClient;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;

import java.net.HttpURLConnection;
import java.time.Duration;
import java.util.Collection;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -109,11 +112,7 @@ public LambdaSink(final PluginSetting pluginSetting,
clientOptions
);
if (lambdaSinkConfig.getDlqPluginSetting() != null) {
this.dlqPushHandler = new DlqPushHandler(pluginFactory,
String.valueOf(lambdaSinkConfig.getDlqPluginSetting().get(BUCKET)),
lambdaSinkConfig.getDlqStsRoleARN()
, lambdaSinkConfig.getDlqStsRegion(),
String.valueOf(lambdaSinkConfig.getDlqPluginSetting().get(KEY_PATH)));
this.dlqPushHandler = new DlqPushHandler(pluginFactory, pluginSetting, lambdaSinkConfig.getDlq(), lambdaSinkConfig.getAwsAuthenticationOptions());
}

}
Expand Down Expand Up @@ -161,8 +160,7 @@ public void doOutput(final Collection<Record<Event>> records) {
outputCodecContext);
} catch (Exception e) {
LOG.error("Exception while processing records ", e);
//TODO: introduce DLQ handler here before releasing the records
releaseEventHandlesPerBatch(false, records);
handleFailure(records, e, HttpURLConnection.HTTP_BAD_REQUEST);
}

for (Map.Entry<Buffer, CompletableFuture<InvokeResponse>> entry : bufferToFutureMap.entrySet()) {
Expand All @@ -179,7 +177,7 @@ public void doOutput(final Collection<Record<Event>> records) {
throw new RuntimeException(errorMessage);
}

releaseEventHandlesPerBatch(true, inputBuffer.getRecords());
releaseEventHandles(inputBuffer.getRecords(), true);
numberOfRecordsSuccessCounter.increment(inputBuffer.getEventCount());
numberOfRequestsSuccessCounter.increment();
if (response.payload() != null) {
Expand All @@ -188,35 +186,59 @@ public void doOutput(final Collection<Record<Event>> records) {

} catch (Exception e) {
LOG.error(NOISY, e.getMessage(), e);
numberOfRecordsFailedCounter.increment(inputBuffer.getEventCount());
numberOfRequestsFailedCounter.increment();
handleFailure(new RuntimeException("failed"), inputBuffer);
handleFailure(inputBuffer.getRecords(), new RuntimeException("failed"), HttpURLConnection.HTTP_INTERNAL_ERROR);
}
}
}


void handleFailure(Throwable throwable, Buffer flushedBuffer) {
try {
numberOfRecordsFailedCounter.increment(flushedBuffer.getEventCount());
SdkBytes payload = flushedBuffer.getPayload();
if (dlqPushHandler != null) {
dlqPushHandler.perform(pluginSetting,
new LambdaSinkFailedDlqData(payload, throwable.getMessage(), 0));
releaseEventHandlesPerBatch(true, flushedBuffer.getRecords());
} else {
releaseEventHandlesPerBatch(false, flushedBuffer.getRecords());
}
} catch (Exception ex) {
LOG.error("Exception occurred during error handling");
releaseEventHandlesPerBatch(false, flushedBuffer.getRecords());
}
private DlqObject createDlqObjectFromEvent(final Event event,
final String functionName,
final int status,
final String message) {
return DlqObject.builder()
.withEventHandle(event.getEventHandle())
.withFailedData(LambdaSinkFailedDlqData.builder()
.withData(event.toJsonString())
.withStatus(status)
.withFunctionName(functionName)
.withMessage(message)
.build())
.withPluginName(pluginSetting.getName())
.withPipelineName(pluginSetting.getPipelineName())
.withPluginId(pluginSetting.getName())
.build();
}

void handleFailure(Collection<Record<Event>> failedRecords, Throwable throwable, int statusCode) {
if (failedRecords.isEmpty()) {
return;
}
numberOfRecordsFailedCounter.increment(failedRecords.size());
numberOfRequestsFailedCounter.increment();
if (dlqPushHandler == null) {
releaseEventHandles(failedRecords, false);
}
try {
final List<DlqObject> dlqObjects = new ArrayList<>();
for (Record<Event> record: failedRecords) {
if (record.getData() != null) {
dlqObjects.add(createDlqObjectFromEvent(record.getData(), lambdaSinkConfig.getFunctionName(), statusCode, throwable.getMessage()));
}
}
dlqPushHandler.perform(dlqObjects);
releaseEventHandles(failedRecords, true);
} catch (Exception ex) {
LOG.error("Exception occured during error handling");
releaseEventHandles(failedRecords, false);
}
}


/*
* Release events per batch
*/
private void releaseEventHandlesPerBatch(boolean success, Collection<Record<Event>> records) {
private void releaseEventHandles(Collection<Record<Event>> records, boolean success) {
for (Record<Event> record : records) {
Event event = record.getData();
if (event != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,22 @@
*/
package org.opensearch.dataprepper.plugins.lambda.sink.dlq;

import com.fasterxml.jackson.databind.ObjectWriter;
import io.micrometer.core.instrument.util.StringUtils;
import org.opensearch.dataprepper.metrics.MetricNames;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.failures.DlqObject;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.plugins.dlq.DlqProvider;
import org.opensearch.dataprepper.plugins.dlq.DlqWriter;
import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.StringJoiner;

import static java.util.UUID.randomUUID;


/**
* * An Handler class which helps log failed data to AWS S3 bucket or file based on configuration.
Expand All @@ -38,94 +29,51 @@ public class DlqPushHandler {

private static final Logger LOG = LoggerFactory.getLogger(DlqPushHandler.class);

private static final String BUCKET = "bucket";

private static final String ROLE_ARN = "sts_role_arn";

private static final String REGION = "region";

private static final String S3_PLUGIN_NAME = "s3";

private static final String KEY_PATH_PREFIX = "key_path_prefix";

private String dlqFile;
public static final String STS_ROLE_ARN = "sts_role_arn";

private String keyPathPrefix;
public static final String REGION = "region";

private DlqProvider dlqProvider;

private ObjectWriter objectWriter;

public DlqPushHandler(
final PluginFactory pluginFactory,
final String bucket,
final String stsRoleArn,
final String awsRegion,
final String dlqPathPrefix) {

this.dlqProvider = getDlqProvider(pluginFactory,bucket,stsRoleArn,awsRegion,dlqPathPrefix);
}

public void perform(final PluginSetting pluginSetting,
final Object failedData) {
if(dlqFile != null)
writeToFile(failedData);
else
pushToS3(pluginSetting, failedData);
}

private void writeToFile(Object failedData) {
try(BufferedWriter dlqFileWriter = Files.newBufferedWriter(Paths.get(dlqFile),
StandardOpenOption.CREATE, StandardOpenOption.APPEND)) {
dlqFileWriter.write(objectWriter.writeValueAsString(failedData)+"\n");
} catch (IOException e) {
LOG.error("Exception while writing failed data to DLQ file Exception: ",e);
private PluginSetting dlqPluginSetting;

private DlqWriter dlqWriter;

public DlqPushHandler(final PluginFactory pluginFactory, final PluginSetting pluginSetting,
final PluginModel dlqConfig, final AwsAuthenticationOptions awsAuthenticationOptions) {
dlqPluginSetting = new PluginSetting(dlqConfig.getPluginName(), dlqConfig.getPluginSettings());
dlqPluginSetting.setPipelineName(pluginSetting.getPipelineName());
Map<String, Object> dlqSettings = dlqPluginSetting.getSettings();
boolean settingsChanged = false;
if (!dlqSettings.containsKey(REGION)) {
if (awsAuthenticationOptions != null) {
dlqSettings.put(REGION, String.valueOf(awsAuthenticationOptions.getAwsRegion()));
settingsChanged = true;
}
}
}

private void pushToS3(PluginSetting pluginSetting, Object failedData) {
DlqWriter dlqWriter = getDlqWriter(pluginSetting.getPipelineName());
try {
String pluginId = randomUUID().toString();
DlqObject dlqObject = DlqObject.builder()
.withPluginId(pluginId)
.withPluginName(pluginSetting.getName())
.withPipelineName(pluginSetting.getPipelineName())
.withFailedData(failedData)
.build();
final List<DlqObject> dlqObjects = Arrays.asList(dlqObject);
dlqWriter.write(dlqObjects, pluginSetting.getPipelineName(), pluginId);
LOG.info("wrote {} events to DLQ",dlqObjects.size());
} catch (final IOException e) {
LOG.error("Exception while writing failed data to DLQ, Exception : ", e);
if (!dlqSettings.containsKey(STS_ROLE_ARN)) {
if (awsAuthenticationOptions != null) {
dlqSettings.put(STS_ROLE_ARN, String.valueOf(awsAuthenticationOptions.getAwsStsRoleArn()));
settingsChanged = true;
}
}
if (settingsChanged) {
LOG.info("Using AWS credentials from Lambda Sink Config");
dlqPluginSetting.setSettings(dlqSettings);
}
this.dlqProvider = pluginFactory.loadPlugin(DlqProvider.class, dlqPluginSetting);
if (this.dlqProvider != null) {
Optional<DlqWriter> potentialDlq = this.dlqProvider.getDlqWriter(new StringJoiner(MetricNames.DELIMITER)
.add(pluginSetting.getPipelineName())
.add(pluginSetting.getName()).toString());
this.dlqWriter = potentialDlq.isPresent() ? potentialDlq.get() : null;
}
}

private DlqWriter getDlqWriter(final String writerPipelineName) {
Optional<DlqWriter> potentialDlq = dlqProvider.getDlqWriter(new StringJoiner(MetricNames.DELIMITER)
.add(writerPipelineName).toString());
DlqWriter dlqWriter = potentialDlq.isPresent() ? potentialDlq.get() : null;
return dlqWriter;
}

private DlqProvider getDlqProvider(final PluginFactory pluginFactory,
final String bucket,
final String stsRoleArn,
final String awsRegion,
final String dlqPathPrefix) {
final Map<String, Object> props = new HashMap<>();
props.put(BUCKET, bucket);
props.put(ROLE_ARN, stsRoleArn);
props.put(REGION, awsRegion);
this.keyPathPrefix = StringUtils.isEmpty(dlqPathPrefix) ? dlqPathPrefix : enforceDefaultDelimiterOnKeyPathPrefix(dlqPathPrefix);
props.put(KEY_PATH_PREFIX, dlqPathPrefix);
final PluginSetting dlqPluginSetting = new PluginSetting(S3_PLUGIN_NAME, props);
DlqProvider dlqProvider = pluginFactory.loadPlugin(DlqProvider.class, dlqPluginSetting);
return dlqProvider;
}

private String enforceDefaultDelimiterOnKeyPathPrefix(final String keyPathPrefix) {
return (keyPathPrefix.charAt(keyPathPrefix.length() - 1) == '/') ? keyPathPrefix : keyPathPrefix.concat("/");
public void perform(final List<DlqObject> dlqObjects) throws Exception {
if (dlqWriter != null && dlqObjects != null && dlqObjects.size() > 0) {
dlqWriter.write(dlqObjects, dlqPluginSetting.getPipelineName(), dlqPluginSetting.getName());
}
}
}

Loading

0 comments on commit 7546062

Please sign in to comment.