diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/HttpBulkIndexer.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/HttpBulkIndexer.java index 06c7fe46809..08d21f59591 100644 --- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/HttpBulkIndexer.java +++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/HttpBulkIndexer.java @@ -21,6 +21,7 @@ import java.util.Iterator; import java.util.LinkedHashSet; import java.util.Set; +import java.util.concurrent.RejectedExecutionException; import okhttp3.HttpUrl; import okhttp3.MediaType; import okhttp3.Request; @@ -77,6 +78,7 @@ enum CheckForErrors implements HttpCall.BodyConverter { @Override public Void convert(BufferedSource b) throws IOException { String content = b.readUtf8(); + if (content.contains("\"status\":429")) throw new RejectedExecutionException(content); if (content.contains("\"errors\":true")) throw new IllegalStateException(content); return null; } diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/HttpBulkIndexerTest.java b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/HttpBulkIndexerTest.java new file mode 100644 index 00000000000..0f8589dc970 --- /dev/null +++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/HttpBulkIndexerTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package zipkin2.elasticsearch.internal; + +import java.io.IOException; +import java.util.concurrent.RejectedExecutionException; +import okio.Buffer; +import okio.ByteString; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import zipkin2.elasticsearch.internal.HttpBulkIndexer.CheckForErrors; + +public class HttpBulkIndexerTest { + @Rule public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void throwRejectedExecutionExceptionWhenOverCapacity() throws IOException { + Buffer response = new Buffer().write(ByteString.encodeUtf8("{\"took\":0,\"errors\":true,\"items\":[{\"index\":{\"_index\":\"dev-zipkin:span-2019.04.18\",\"_type\":\"span\",\"_id\":\"2511\",\"status\":429,\"error\":{\"type\":\"es_rejected_execution_exception\",\"reason\":\"rejected execution of org.elasticsearch.transport.TransportService$7@7ec1ea93 on EsThreadPoolExecutor[bulk, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@621571ba[Running, pool size = 4, active threads = 4, queued tasks = 200, completed tasks = 3838534]]\"}}}]}")); + + expectedException.expect(RejectedExecutionException.class); + CheckForErrors.INSTANCE.convert(response); + } +}