Skip to content

Commit

Permalink
[FLINK-33925][connectors/opensearch] Allow customising bulk failure h…
Browse files Browse the repository at this point in the history
…andling

Extracted `BulkResponseInspector` interface to allow custom handling of (partially) failed bulk requests. If not overridden, default behaviour remains unchanged and partial failures are escalated.

* fixes https://issues.apache.org/jira/browse/FLINK-33925
* allows custom metrics to be exposed
  • Loading branch information
schulzp authored and reswqa committed Mar 14, 2024
1 parent 177c865 commit 9e161cc
Show file tree
Hide file tree
Showing 7 changed files with 458 additions and 51 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.opensearch.sink;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.function.SerializableFunction;

import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;

/** Callback for inspecting a {@link BulkResponse}. */
@PublicEvolving
@FunctionalInterface
public interface BulkResponseInspector {

/**
* Callback to inspect a {@code response} in the context of its {@code request}. It may throw a
* {@link org.apache.flink.util.FlinkRuntimeException} to indicate that the bulk failed
* (partially).
*/
void inspect(BulkRequest request, BulkResponse response);

/**
* Factory interface for creating a {@link BulkResponseInspector} in the context of a sink.
* Allows obtaining a {@link org.apache.flink.metrics.MetricGroup} to capture custom metrics.
*/
@PublicEvolving
@FunctionalInterface
interface BulkResponseInspectorFactory
extends SerializableFunction<
BulkResponseInspectorFactory.InitContext, BulkResponseInspector> {

/**
* The interface exposes a subset of {@link
* org.apache.flink.api.connector.sink2.Sink.InitContext}.
*/
interface InitContext {

/** Returns: The metric group of the surrounding writer. */
MetricGroup metricGroup();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.opensearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;

import org.apache.http.HttpHost;

Expand Down Expand Up @@ -60,7 +61,7 @@ public class OpensearchSink<IN> implements Sink<IN> {
private final NetworkClientConfig networkClientConfig;
private final DeliveryGuarantee deliveryGuarantee;
private final RestClientFactory restClientFactory;
private final FailureHandler failureHandler;
private final BulkResponseInspectorFactory bulkResponseInspectorFactory;

OpensearchSink(
List<HttpHost> hosts,
Expand All @@ -69,15 +70,15 @@ public class OpensearchSink<IN> implements Sink<IN> {
BulkProcessorConfig buildBulkProcessorConfig,
NetworkClientConfig networkClientConfig,
RestClientFactory restClientFactory,
FailureHandler failureHandler) {
BulkResponseInspectorFactory bulkResponseInspectorFactory) {
this.hosts = checkNotNull(hosts);
checkArgument(!hosts.isEmpty(), "Hosts cannot be empty.");
this.emitter = checkNotNull(emitter);
this.deliveryGuarantee = checkNotNull(deliveryGuarantee);
this.buildBulkProcessorConfig = checkNotNull(buildBulkProcessorConfig);
this.networkClientConfig = checkNotNull(networkClientConfig);
this.restClientFactory = checkNotNull(restClientFactory);
this.failureHandler = checkNotNull(failureHandler);
this.bulkResponseInspectorFactory = checkNotNull(bulkResponseInspectorFactory);
}

@Override
Expand All @@ -91,11 +92,16 @@ public SinkWriter<IN> createWriter(InitContext context) throws IOException {
context.metricGroup(),
context.getMailboxExecutor(),
restClientFactory,
failureHandler);
bulkResponseInspectorFactory.apply(context::metricGroup));
}

@VisibleForTesting
DeliveryGuarantee getDeliveryGuarantee() {
return deliveryGuarantee;
}

@VisibleForTesting
BulkResponseInspectorFactory getBulkResponseInspectorFactory() {
return bulkResponseInspectorFactory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.opensearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;
import org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultBulkResponseInspector;
import org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultFailureHandler;
import org.apache.flink.util.InstantiationUtil;

import org.apache.http.HttpHost;

import java.util.Arrays;
import java.util.List;

import static org.apache.flink.connector.opensearch.sink.OpensearchWriter.DEFAULT_FAILURE_HANDLER;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
Expand Down Expand Up @@ -74,7 +76,8 @@ public class OpensearchSinkBuilder<IN> {
private Integer socketTimeout;
private Boolean allowInsecure;
private RestClientFactory restClientFactory;
private FailureHandler failureHandler = DEFAULT_FAILURE_HANDLER;
private FailureHandler failureHandler = new DefaultFailureHandler();
private BulkResponseInspectorFactory bulkResponseInspectorFactory;

public OpensearchSinkBuilder() {
restClientFactory = new DefaultRestClientFactory();
Expand Down Expand Up @@ -315,6 +318,20 @@ public OpensearchSinkBuilder<IN> setFailureHandler(FailureHandler failureHandler
return self();
}

/**
* Overrides the default {@link BulkResponseInspectorFactory}. A custom {@link
* BulkResponseInspector}, for example, can change the failure handling and capture additional
* metrics. See {@link #failureHandler} for a simpler way of handling failures.
*
* @param bulkResponseInspectorFactory the factory
* @return this builder
*/
public OpensearchSinkBuilder<IN> setBulkResponseInspectorFactory(
BulkResponseInspectorFactory bulkResponseInspectorFactory) {
this.bulkResponseInspectorFactory = checkNotNull(bulkResponseInspectorFactory);
return self();
}

/**
* Constructs the {@link OpensearchSink} with the properties configured this builder.
*
Expand All @@ -334,7 +351,13 @@ public OpensearchSink<IN> build() {
bulkProcessorConfig,
networkClientConfig,
restClientFactory,
failureHandler);
getBulkResponseInspectorFactory());
}

protected BulkResponseInspectorFactory getBulkResponseInspectorFactory() {
return this.bulkResponseInspectorFactory == null
? new DefaultBulkResponseInspectorFactory(failureHandler)
: this.bulkResponseInspectorFactory;
}

private NetworkClientConfig buildNetworkClientConfig() {
Expand Down Expand Up @@ -395,4 +418,23 @@ public String toString() {
+ '\''
+ '}';
}

/**
* Default factory for {@link FailureHandler}-bound {@link BulkResponseInspector
* BulkResponseInspectors}. A Static class is used instead of anonymous/lambda to avoid
* non-serializable references to {@link OpensearchSinkBuilder}.
*/
static class DefaultBulkResponseInspectorFactory implements BulkResponseInspectorFactory {

private final FailureHandler failureHandler;

DefaultBulkResponseInspectorFactory(FailureHandler failureHandler) {
this.failureHandler = failureHandler;
}

@Override
public BulkResponseInspector apply(InitContext context) {
return new DefaultBulkResponseInspector(failureHandler);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,13 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {

private static final Logger LOG = LoggerFactory.getLogger(OpensearchWriter.class);

public static final FailureHandler DEFAULT_FAILURE_HANDLER =
ex -> {
throw new FlinkRuntimeException(ex);
};

private final OpensearchEmitter<? super IN> emitter;
private final MailboxExecutor mailboxExecutor;
private final boolean flushOnCheckpoint;
private final BulkProcessor bulkProcessor;
private final RestHighLevelClient client;
private final RequestIndexer requestIndexer;
private final Counter numBytesOutCounter;
private final FailureHandler failureHandler;

private long pendingActions = 0;
private boolean checkpointInProgress = false;
Expand Down Expand Up @@ -102,7 +96,7 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
SinkWriterMetricGroup metricGroup,
MailboxExecutor mailboxExecutor,
RestClientFactory restClientFactory,
FailureHandler failureHandler) {
BulkResponseInspector bulkResponseInspector) {
this.emitter = checkNotNull(emitter);
this.flushOnCheckpoint = flushOnCheckpoint;
this.mailboxExecutor = checkNotNull(mailboxExecutor);
Expand All @@ -113,7 +107,8 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
builder, new DefaultRestClientConfig(networkClientConfig));

this.client = new RestHighLevelClient(builder);
this.bulkProcessor = createBulkProcessor(bulkProcessorConfig);
this.bulkProcessor =
createBulkProcessor(bulkProcessorConfig, checkNotNull(bulkResponseInspector));
this.requestIndexer = new DefaultRequestIndexer(metricGroup.getNumRecordsSendCounter());
checkNotNull(metricGroup);
metricGroup.setCurrentSendTimeGauge(() -> ackTime - lastSendTime);
Expand All @@ -123,7 +118,6 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
} catch (Exception e) {
throw new FlinkRuntimeException("Failed to open the OpensearchEmitter", e);
}
this.failureHandler = failureHandler;
}

@Override
Expand Down Expand Up @@ -163,7 +157,8 @@ public void close() throws Exception {
client.close();
}

private BulkProcessor createBulkProcessor(BulkProcessorConfig bulkProcessorConfig) {
private BulkProcessor createBulkProcessor(
BulkProcessorConfig bulkProcessorConfig, BulkResponseInspector bulkResponseInspector) {

final BulkProcessor.Builder builder =
BulkProcessor.builder(
Expand All @@ -180,7 +175,7 @@ public void accept(
bulkResponseActionListener);
}
},
new BulkListener());
new BulkListener(bulkResponseInspector));

if (bulkProcessorConfig.getBulkFlushMaxActions() != -1) {
builder.setBulkActions(bulkProcessorConfig.getBulkFlushMaxActions());
Expand Down Expand Up @@ -223,6 +218,12 @@ public void accept(

private class BulkListener implements BulkProcessor.Listener {

private final BulkResponseInspector bulkResponseInspector;

public BulkListener(BulkResponseInspector bulkResponseInspector) {
this.bulkResponseInspector = bulkResponseInspector;
}

@Override
public void beforeBulk(long executionId, BulkRequest request) {
LOG.info("Sending bulk of {} actions to Opensearch.", request.numberOfActions());
Expand All @@ -245,6 +246,11 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
},
"opensearchErrorCallback");
}

private void extractFailures(BulkRequest request, BulkResponse response) {
bulkResponseInspector.inspect(request, response);
pendingActions -= request.numberOfActions();
}
}

private void enqueueActionInMailbox(
Expand All @@ -259,35 +265,6 @@ private void enqueueActionInMailbox(
mailboxExecutor.execute(action, actionName);
}

private void extractFailures(BulkRequest request, BulkResponse response) {
if (!response.hasFailures()) {
pendingActions -= request.numberOfActions();
return;
}

Throwable chainedFailures = null;
for (int i = 0; i < response.getItems().length; i++) {
final BulkItemResponse itemResponse = response.getItems()[i];
if (!itemResponse.isFailed()) {
continue;
}
final Throwable failure = itemResponse.getFailure().getCause();
if (failure == null) {
continue;
}
final RestStatus restStatus = itemResponse.getFailure().getStatus();
final DocWriteRequest<?> actionRequest = request.requests().get(i);

chainedFailures =
firstOrSuppressed(
wrapException(restStatus, failure, actionRequest), chainedFailures);
}
if (chainedFailures == null) {
return;
}
failureHandler.onFailure(chainedFailures);
}

private static Throwable wrapException(
RestStatus restStatus, Throwable rootFailure, DocWriteRequest<?> actionRequest) {
if (restStatus == null) {
Expand Down Expand Up @@ -345,4 +322,61 @@ public void add(UpdateRequest... updateRequests) {
}
}
}

/**
* A strict implementation that fails if either the whole bulk request failed or any of its
* actions.
*/
static class DefaultBulkResponseInspector implements BulkResponseInspector {

@VisibleForTesting final FailureHandler failureHandler;

DefaultBulkResponseInspector() {
this(new DefaultFailureHandler());
}

DefaultBulkResponseInspector(FailureHandler failureHandler) {
this.failureHandler = checkNotNull(failureHandler);
}

@Override
public void inspect(BulkRequest request, BulkResponse response) {
if (!response.hasFailures()) {
return;
}

Throwable chainedFailures = null;
for (int i = 0; i < response.getItems().length; i++) {
final BulkItemResponse itemResponse = response.getItems()[i];
if (!itemResponse.isFailed()) {
continue;
}
final Throwable failure = itemResponse.getFailure().getCause();
if (failure == null) {
continue;
}
final RestStatus restStatus = itemResponse.getFailure().getStatus();
final DocWriteRequest<?> actionRequest = request.requests().get(i);

chainedFailures =
firstOrSuppressed(
wrapException(restStatus, failure, actionRequest), chainedFailures);
}
if (chainedFailures == null) {
return;
}
failureHandler.onFailure(chainedFailures);
}
}

static class DefaultFailureHandler implements FailureHandler {

@Override
public void onFailure(Throwable failure) {
if (failure instanceof FlinkRuntimeException) {
throw (FlinkRuntimeException) failure;
}
throw new FlinkRuntimeException(failure);
}
}
}
Loading

0 comments on commit 9e161cc

Please sign in to comment.