From 60bf284c7cd164ec727a5c4537b29c0201ece2f2 Mon Sep 17 00:00:00 2001 From: Logic-32 <25107222+Logic-32@users.noreply.github.com> Date: Tue, 23 Apr 2019 15:47:49 -0600 Subject: [PATCH] Adding test to verify behavior when a 429 status is returned from ES. --- .../internal/HttpBulkIndexerTest.java | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/HttpBulkIndexerTest.java diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/HttpBulkIndexerTest.java b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/HttpBulkIndexerTest.java new file mode 100644 index 00000000000..670d0a21278 --- /dev/null +++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/HttpBulkIndexerTest.java @@ -0,0 +1,35 @@ +/* + * Copyright 2015-2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.elasticsearch.internal; + +import java.io.IOException; +import java.util.concurrent.RejectedExecutionException; +import okio.Buffer; +import okio.ByteString; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import zipkin2.elasticsearch.internal.HttpBulkIndexer.CheckForErrors; + +public class HttpBulkIndexerTest { + @Rule public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void throwRejectedExecutionExceptionWhenOverCapacity() throws IOException { + Buffer response = new Buffer().write(ByteString.encodeUtf8("{\"took\":0,\"errors\":true,\"items\":[{\"index\":{\"_index\":\"dev-zipkin:span-2019.04.18\",\"_type\":\"span\",\"_id\":\"2511\",\"status\":429,\"error\":{\"type\":\"es_rejected_execution_exception\",\"reason\":\"rejected execution of org.elasticsearch.transport.TransportService$7@7ec1ea93 on EsThreadPoolExecutor[bulk, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@621571ba[Running, pool size = 4, active threads = 4, queued tasks = 200, completed tasks = 3838534]]\"}}}]}")); + + expectedException.expect(RejectedExecutionException.class); + CheckForErrors.INSTANCE.convert(response); + } +}