From d6b2bf682b980e9d30e86bd6bb169e63c3f278ab Mon Sep 17 00:00:00 2001 From: Logic-32 <25107222+Logic-32@users.noreply.github.com> Date: Thu, 18 Apr 2019 10:15:23 -0600 Subject: [PATCH 1/3] Updating Elasticsearch's HttpBulkIndexer to check for "too many requests" error and throw more descriptive exception. --- .../java/zipkin2/elasticsearch/internal/HttpBulkIndexer.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/HttpBulkIndexer.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/HttpBulkIndexer.java index 06c7fe46809..08d21f59591 100644 --- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/HttpBulkIndexer.java +++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/HttpBulkIndexer.java @@ -21,6 +21,7 @@ import java.util.Iterator; import java.util.LinkedHashSet; import java.util.Set; +import java.util.concurrent.RejectedExecutionException; import okhttp3.HttpUrl; import okhttp3.MediaType; import okhttp3.Request; @@ -77,6 +78,7 @@ enum CheckForErrors implements HttpCall.BodyConverter { @Override public Void convert(BufferedSource b) throws IOException { String content = b.readUtf8(); + if (content.contains("\"status\":429")) throw new RejectedExecutionException(content); if (content.contains("\"errors\":true")) throw new IllegalStateException(content); return null; } From b8d75230e64bf918c5a9a9d2fa42e0e4d52cf670 Mon Sep 17 00:00:00 2001 From: Logic-32 <25107222+Logic-32@users.noreply.github.com> Date: Tue, 23 Apr 2019 15:47:49 -0600 Subject: [PATCH 2/3] Adding test to verify behavior when a 429 status is returned from ES. --- .../internal/HttpBulkIndexerTest.java | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/HttpBulkIndexerTest.java diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/HttpBulkIndexerTest.java b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/HttpBulkIndexerTest.java new file mode 100644 index 00000000000..670d0a21278 --- /dev/null +++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/HttpBulkIndexerTest.java @@ -0,0 +1,35 @@ +/* + * Copyright 2015-2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.elasticsearch.internal; + +import java.io.IOException; +import java.util.concurrent.RejectedExecutionException; +import okio.Buffer; +import okio.ByteString; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import zipkin2.elasticsearch.internal.HttpBulkIndexer.CheckForErrors; + +public class HttpBulkIndexerTest { + @Rule public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void throwRejectedExecutionExceptionWhenOverCapacity() throws IOException { + Buffer response = new Buffer().write(ByteString.encodeUtf8("{\"took\":0,\"errors\":true,\"items\":[{\"index\":{\"_index\":\"dev-zipkin:span-2019.04.18\",\"_type\":\"span\",\"_id\":\"2511\",\"status\":429,\"error\":{\"type\":\"es_rejected_execution_exception\",\"reason\":\"rejected execution of org.elasticsearch.transport.TransportService$7@7ec1ea93 on EsThreadPoolExecutor[bulk, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@621571ba[Running, pool size = 4, active threads = 4, queued tasks = 200, completed tasks = 3838534]]\"}}}]}")); + + expectedException.expect(RejectedExecutionException.class); + CheckForErrors.INSTANCE.convert(response); + } +} From b1e119ca2507226cd091e0f1829e05a24ec6fc7a Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Wed, 24 Apr 2019 11:45:01 +0800 Subject: [PATCH 3/3] fixes license drift --- .../internal/HttpBulkIndexerTest.java | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/HttpBulkIndexerTest.java b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/HttpBulkIndexerTest.java index 670d0a21278..0f8589dc970 100644 --- a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/HttpBulkIndexerTest.java +++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/HttpBulkIndexerTest.java @@ -1,15 +1,18 @@ /* - * Copyright 2015-2019 The OpenZipkin Authors + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package zipkin2.elasticsearch.internal;