Skip to content

Commit

Permalink
Fix up trappy timeouts related to ingest pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
DaveCTurner committed Sep 16, 2024
1 parent 49dcc16 commit 5b1c5fe
Show file tree
Hide file tree
Showing 15 changed files with 172 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;

import java.io.IOException;
import java.util.Objects;
Expand All @@ -20,8 +21,8 @@ public class DeletePipelineRequest extends AcknowledgedRequest<DeletePipelineReq

private String id;

public DeletePipelineRequest(String id) {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, DEFAULT_ACK_TIMEOUT);
public DeletePipelineRequest(TimeValue masterNodeTimeout, TimeValue ackTimeout, String id) {
super(masterNodeTimeout, ackTimeout);
if (id == null) {
throw new IllegalArgumentException("id is missing");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;

import java.io.IOException;

Expand All @@ -21,17 +22,17 @@ public class GetPipelineRequest extends MasterNodeReadRequest<GetPipelineRequest
private final String[] ids;
private final boolean summary;

public GetPipelineRequest(boolean summary, String... ids) {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
public GetPipelineRequest(TimeValue masterNodeTimeout, boolean summary, String... ids) {
super(masterNodeTimeout);
if (ids == null) {
throw new IllegalArgumentException("ids cannot be null");
}
this.ids = ids;
this.summary = summary;
}

public GetPipelineRequest(String... ids) {
this(false, ids);
public GetPipelineRequest(TimeValue masterNodeTimeout, String... ids) {
this(masterNodeTimeout, false, ids);
}

public GetPipelineRequest(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;
Expand All @@ -31,16 +32,29 @@ public class PutPipelineRequest extends AcknowledgedRequest<PutPipelineRequest>
/**
* Create a new pipeline request with the id and source along with the content type of the source
*/
public PutPipelineRequest(String id, BytesReference source, XContentType xContentType, Integer version) {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, DEFAULT_ACK_TIMEOUT);
public PutPipelineRequest(
TimeValue masterNodeTimeout,
TimeValue ackTimeout,
String id,
BytesReference source,
XContentType xContentType,
Integer version
) {
super(masterNodeTimeout, ackTimeout);
this.id = Objects.requireNonNull(id);
this.source = Objects.requireNonNull(source);
this.xContentType = Objects.requireNonNull(xContentType);
this.version = version;
}

public PutPipelineRequest(String id, BytesReference source, XContentType xContentType) {
this(id, source, xContentType, null);
public PutPipelineRequest(
TimeValue masterNodeTimeout,
TimeValue ackTimeout,
String id,
BytesReference source,
XContentType xContentType
) {
this(masterNodeTimeout, ackTimeout, id, source, xContentType, null);
}

public PutPipelineRequest(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,14 @@ public TransformState transform(Object source, TransformState prevState) throws
toDelete.removeAll(entities);

for (var pipelineToDelete : toDelete) {
var task = new IngestService.DeletePipelineClusterStateUpdateTask(null, new DeletePipelineRequest(pipelineToDelete));
var task = new IngestService.DeletePipelineClusterStateUpdateTask(
null,
new DeletePipelineRequest(
RESERVED_CLUSTER_STATE_HANDLER_IGNORED_TIMEOUT,
RESERVED_CLUSTER_STATE_HANDLER_IGNORED_TIMEOUT,
pipelineToDelete
)
);
state = wrapIngestTaskExecute(task, state);
}

Expand All @@ -119,7 +126,15 @@ public List<PutPipelineRequest> fromXContent(XContentParser parser) throws IOExc
Map<String, ?> content = (Map<String, ?>) source.get(id);
try (XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON)) {
builder.map(content);
result.add(new PutPipelineRequest(id, BytesReference.bytes(builder), XContentType.JSON));
result.add(
new PutPipelineRequest(
RESERVED_CLUSTER_STATE_HANDLER_IGNORED_TIMEOUT,
RESERVED_CLUSTER_STATE_HANDLER_IGNORED_TIMEOUT,
id,
BytesReference.bytes(builder),
XContentType.JSON
)
);
} catch (Exception e) {
throw new ElasticsearchGenerationException("Failed to generate [" + source + "]", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ public String getName() {

@Override
public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
DeletePipelineRequest request = new DeletePipelineRequest(restRequest.param("id"));
request.masterNodeTimeout(getMasterNodeTimeout(restRequest));
request.ackTimeout(getAckTimeout(restRequest));
final var request = new DeletePipelineRequest(
getMasterNodeTimeout(restRequest),
getAckTimeout(restRequest),
restRequest.param("id")
);
return channel -> client.execute(DeletePipelineTransportAction.TYPE, request, new RestToXContentListener<>(channel));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ public String getName() {

@Override
public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
GetPipelineRequest request = new GetPipelineRequest(
final var request = new GetPipelineRequest(
getMasterNodeTimeout(restRequest),
restRequest.paramAsBoolean("summary", false),
Strings.splitStringByCommaToArray(restRequest.param("id"))
);
request.masterNodeTimeout(getMasterNodeTimeout(restRequest));
return channel -> client.execute(
GetPipelineAction.INSTANCE,
request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,14 @@ public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient cl
}

Tuple<XContentType, BytesReference> sourceTuple = restRequest.contentOrSourceParam();
PutPipelineRequest request = new PutPipelineRequest(restRequest.param("id"), sourceTuple.v2(), sourceTuple.v1(), ifVersion);
request.masterNodeTimeout(getMasterNodeTimeout(restRequest));
request.ackTimeout(getAckTimeout(restRequest));
final var request = new PutPipelineRequest(
getMasterNodeTimeout(restRequest),
getAckTimeout(restRequest),
restRequest.param("id"),
sourceTuple.v2(),
sourceTuple.v1(),
ifVersion
);
return channel -> client.execute(PutPipelineTransportAction.TYPE, request, new RestToXContentListener<>(channel));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@ public void testToXContent() throws IOException {
// End first processor
pipelineBuilder.endArray();
pipelineBuilder.endObject();
PutPipelineRequest request = new PutPipelineRequest("1", BytesReference.bytes(pipelineBuilder), xContentType);
PutPipelineRequest request = new PutPipelineRequest(
TEST_REQUEST_TIMEOUT,
TEST_REQUEST_TIMEOUT,
"1",
BytesReference.bytes(pipelineBuilder),
xContentType
);
XContentBuilder requestBuilder = XContentBuilder.builder(xContentType.xContent());
BytesReference actualRequestBody = BytesReference.bytes(request.toXContent(requestBuilder, ToXContent.EMPTY_PARAMS));
assertEquals(BytesReference.bytes(pipelineBuilder), actualRequestBody);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ public void testDelete() {
assertThat(ingestService.getPipeline("_id"), notNullValue());

// Delete pipeline:
DeletePipelineRequest deleteRequest = new DeletePipelineRequest("_id");
DeletePipelineRequest deleteRequest = new DeletePipelineRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, "_id");
previousClusterState = clusterState;
clusterState = executeDelete(deleteRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
Expand Down Expand Up @@ -713,7 +713,7 @@ public void testCrud() throws Exception {
assertThat(pipeline.getProcessors().size(), equalTo(1));
assertThat(pipeline.getProcessors().get(0).getType(), equalTo("set"));

DeletePipelineRequest deleteRequest = new DeletePipelineRequest(id);
DeletePipelineRequest deleteRequest = new DeletePipelineRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, id);
previousClusterState = clusterState;
clusterState = executeDelete(deleteRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
Expand Down Expand Up @@ -803,7 +803,7 @@ public void testDeleteUsingWildcard() {
assertThat(ingestService.getPipeline("q1"), notNullValue());

// Delete pipeline matching wildcard
DeletePipelineRequest deleteRequest = new DeletePipelineRequest("p*");
DeletePipelineRequest deleteRequest = new DeletePipelineRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, "p*");
previousClusterState = clusterState;
clusterState = executeDelete(deleteRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
Expand All @@ -816,13 +816,16 @@ public void testDeleteUsingWildcard() {
assertThat(
expectThrows(
ResourceNotFoundException.class,
() -> executeFailingDelete(new DeletePipelineRequest("unknown"), finalClusterState)
() -> executeFailingDelete(
new DeletePipelineRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, "unknown"),
finalClusterState
)
).getMessage(),
equalTo("pipeline [unknown] is missing")
);

// match all wildcard works on last remaining pipeline
DeletePipelineRequest matchAllDeleteRequest = new DeletePipelineRequest("*");
DeletePipelineRequest matchAllDeleteRequest = new DeletePipelineRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, "*");
previousClusterState = clusterState;
clusterState = executeDelete(matchAllDeleteRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
Expand Down Expand Up @@ -851,8 +854,10 @@ public void testDeleteWithExistingUnmatchedPipelines() {

ClusterState finalClusterState = clusterState;
assertThat(
expectThrows(ResourceNotFoundException.class, () -> executeFailingDelete(new DeletePipelineRequest("z*"), finalClusterState))
.getMessage(),
expectThrows(
ResourceNotFoundException.class,
() -> executeFailingDelete(new DeletePipelineRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, "z*"), finalClusterState)
).getMessage(),
equalTo("pipeline [z*] is missing")
);
}
Expand All @@ -878,7 +883,7 @@ public void testDeleteWithIndexUsePipeline() {
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
assertThat(ingestService.getPipeline("_id"), notNullValue());

DeletePipelineRequest deleteRequest = new DeletePipelineRequest("_id");
DeletePipelineRequest deleteRequest = new DeletePipelineRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, "_id");

{
// delete pipeline which is in used of default_pipeline
Expand Down Expand Up @@ -2637,7 +2642,14 @@ public void testPutPipelineWithVersionedUpdateWithoutExistingPipeline() {

final Integer version = randomInt();
var pipelineString = "{\"version\": " + version + ", \"processors\": []}";
var request = new PutPipelineRequest(pipelineId, new BytesArray(pipelineString), XContentType.JSON, version);
var request = new PutPipelineRequest(
TEST_REQUEST_TIMEOUT,
TEST_REQUEST_TIMEOUT,
pipelineId,
new BytesArray(pipelineString),
XContentType.JSON,
version
);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> executeFailingPut(request, clusterState));
assertThat(
e.getMessage(),
Expand All @@ -2662,7 +2674,14 @@ public void testPutPipelineWithVersionedUpdateDoesNotMatchExistingPipeline() {
.build();

final Integer requestedVersion = randomValueOtherThan(version, ESTestCase::randomInt);
var request = new PutPipelineRequest(pipelineId, new BytesArray(pipelineString), XContentType.JSON, requestedVersion);
var request = new PutPipelineRequest(
TEST_REQUEST_TIMEOUT,
TEST_REQUEST_TIMEOUT,
pipelineId,
new BytesArray(pipelineString),
XContentType.JSON,
requestedVersion
);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> executeFailingPut(request, clusterState));
assertThat(
e.getMessage(),
Expand All @@ -2687,7 +2706,14 @@ public void testPutPipelineWithVersionedUpdateSpecifiesSameVersion() throws Exce
.metadata(Metadata.builder().putCustom(IngestMetadata.TYPE, new IngestMetadata(Map.of(pipelineId, existingPipeline))).build())
.build();

var request = new PutPipelineRequest(pipelineId, new BytesArray(pipelineString), XContentType.JSON, version);
var request = new PutPipelineRequest(
TEST_REQUEST_TIMEOUT,
TEST_REQUEST_TIMEOUT,
pipelineId,
new BytesArray(pipelineString),
XContentType.JSON,
version
);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> executeFailingPut(request, clusterState));
assertThat(e.getMessage(), equalTo(Strings.format("cannot update pipeline [%s] with the same version [%s]", pipelineId, version)));
}
Expand All @@ -2703,7 +2729,14 @@ public void testPutPipelineWithVersionedUpdateSpecifiesValidVersion() throws Exc

final int specifiedVersion = randomValueOtherThan(existingVersion, ESTestCase::randomInt);
var updatedPipelineString = "{\"version\": " + specifiedVersion + ", \"processors\": []}";
var request = new PutPipelineRequest(pipelineId, new BytesArray(updatedPipelineString), XContentType.JSON, existingVersion);
var request = new PutPipelineRequest(
TEST_REQUEST_TIMEOUT,
TEST_REQUEST_TIMEOUT,
pipelineId,
new BytesArray(updatedPipelineString),
XContentType.JSON,
existingVersion
);
var updatedState = executePut(request, clusterState);

var updatedConfig = ((IngestMetadata) updatedState.metadata().custom(IngestMetadata.TYPE)).getPipelines().get(pipelineId);
Expand All @@ -2721,7 +2754,14 @@ public void testPutPipelineWithVersionedUpdateIncrementsVersion() throws Excepti
.build();

var updatedPipelineString = "{\"processors\": []}";
var request = new PutPipelineRequest(pipelineId, new BytesArray(updatedPipelineString), XContentType.JSON, existingVersion);
var request = new PutPipelineRequest(
TEST_REQUEST_TIMEOUT,
TEST_REQUEST_TIMEOUT,
pipelineId,
new BytesArray(updatedPipelineString),
XContentType.JSON,
existingVersion
);
var updatedState = executePut(request, clusterState);

var updatedConfig = ((IngestMetadata) updatedState.metadata().custom(IngestMetadata.TYPE)).getPipelines().get(pipelineId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.io.IOException;

import static org.elasticsearch.test.ESTestCase.TEST_REQUEST_TIMEOUT;
import static org.elasticsearch.test.ESTestCase.safeGet;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
Expand All @@ -46,7 +47,7 @@ private IngestPipelineTestUtils() { /* no instances */ }
* @return a new {@link PutPipelineRequest} with the given {@code id} and body.
*/
public static PutPipelineRequest putJsonPipelineRequest(String id, BytesReference source) {
return new PutPipelineRequest(id, source, XContentType.JSON);
return new PutPipelineRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, id, source, XContentType.JSON);
}

/**
Expand Down Expand Up @@ -103,19 +104,23 @@ public static void putJsonPipeline(ElasticsearchClient client, String id, ToXCon
public static void deletePipelinesIgnoringExceptions(ElasticsearchClient client, Iterable<String> ids) {
for (final var id : ids) {
ESTestCase.safeAwait(
l -> client.execute(DeletePipelineTransportAction.TYPE, new DeletePipelineRequest(id), new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
logger.info("delete pipeline [{}] success [acknowledged={}]", id, acknowledgedResponse.isAcknowledged());
l.onResponse(null);
}
l -> client.execute(
DeletePipelineTransportAction.TYPE,
new DeletePipelineRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, id),
new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
logger.info("delete pipeline [{}] success [acknowledged={}]", id, acknowledgedResponse.isAcknowledged());
l.onResponse(null);
}

@Override
public void onFailure(Exception e) {
logger.warn(Strings.format("delete pipeline [%s] failure", id), e);
l.onResponse(null);
@Override
public void onFailure(Exception e) {
logger.warn(Strings.format("delete pipeline [%s] failure", id), e);
l.onResponse(null);
}
}
})
)
);
}
}
Expand Down
Loading

0 comments on commit 5b1c5fe

Please sign in to comment.