Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spans lost with kafka and elasticsearch (over capacity) #2023

Open
manuelluis opened this issue Apr 26, 2018 · 32 comments
Open

Spans lost with kafka and elasticsearch (over capacity) #2023

manuelluis opened this issue Apr 26, 2018 · 32 comments

Comments

@manuelluis
Copy link

What kind of issue is this?

  • [x ] Bug report.

We have zipkin 2.7.1 reading form kafka with zipkin-autoconfigure-collector-kafka10-2.7.1-module.jar and the storage is elasticsearch.

With high number of spans, we found that we are losing spans. We are checking the metric:
counter.zipkin_collector.spans_dropped.kafka

In the logs we get the exception:

10:13:25.604 [pool-1-thread-1] DEBUG z.collector.kafka10.KafkaCollector - Cannot store spans [1420afe62427f4ce559a0e797bb1b777/13468e1e3b2d9e26] due to IllegalStateException(over capacity)
java.lang.IllegalStateException: over capacity
	at zipkin2.elasticsearch.internal.client.HttpCall.enqueue(HttpCall.java:87)
	at zipkin.internal.V2Collector.record(V2Collector.java:57)
	at zipkin.internal.Collector.accept(Collector.java:79)
	at zipkin.collector.Collector.accept(Collector.java:145)
	at zipkin.collector.kafka10.KafkaCollectorWorker.run(KafkaCollectorWorker.java:84)
	at zipkin.collector.kafka10.KafkaCollector$LazyKafkaWorkers$1.run(KafkaCollector.java:196)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

We increased ES_MAX_REQUESTS but not fix the problem.

@Override public void enqueue(Callback<V> delegate) {
if (!semaphore.tryAcquire()) {
delegate.onError(new IllegalStateException("over capacity"));
return;
}
call.enqueue(new V2CallbackAdapter<>(semaphore, bodyConverter, delegate));
}

I think that in the case of reading spans form Kafka if semaphore.tryAcquire() fails it must block, or the Kafka reader can control it with a new call to write the spans to the storage that blocks until there is resources available in the storage.

With kafka we have the buffering so we don't have the problems of the http request to write to the storage, we can control easy the back pressure of the storage not reading from kafka without lossing any span.

@codefromthecrypt
Copy link
Member

I agree this is an issue with better knowledge about the collector we could handle differently. For example, we can do blocking writes.

@malonso1976
Copy link

Hi,
in order to do so:

@Override public void enqueue(Callback<V> delegate) {
    try {
      semaphore.acquire();
      call.enqueue(new V2CallbackAdapter<>(semaphore, bodyConverter, delegate));
    } catch (InterruptedException e) {
      delegate.onError(e);
    }
  }

As a result, spans are no longer dropped under heavy load. Manuel and I have already tested it.

@codefromthecrypt
Copy link
Member

codefromthecrypt commented May 4, 2018 via email

@malonso1976
Copy link

Ok, please let me know if you are interested in a pull-request for this issue with ElasticSearch and feel free to discuss on the details

@codefromthecrypt
Copy link
Member

PS I think easiest way is to add CollectorComponent.blockOnStorage option. This would allow us to wait until there's another request available, and suitable for most storage implementations. Even if elasticsearch returns unavailable, it will at least wait for that to occur.

@afalko
Copy link
Contributor

afalko commented Aug 7, 2018

We are facing the same problem. @malonso1976 were you still hoping to create a pull request?

@codefromthecrypt
Copy link
Member

codefromthecrypt commented Aug 8, 2018 via email

@malonso1976
Copy link

Sure, I will. I hope I find time in the next days

@malonso1976
Copy link

I have a pull request ready to be submitted. I have no permissions to push it to this repo, how can I send it back to you?

@codefromthecrypt
Copy link
Member

codefromthecrypt commented Aug 10, 2018 via email

@malonso1976
Copy link

Done

@DanMelman
Copy link

DanMelman commented Jan 24, 2019

Hi,
We were having the same exact exception on our production system, having some spans lost and not written to elastic. I have tried the above mentioned fix by forking from @malonso1976 and deploying the code from the new branch, but unfortunately it does not seem to solve the problem (still getting the exception).

I tried another approach - in the HttpCall class, inside enqueue, I gave the tryAcquire a timeout of a few seconds. That seems to solve the problem for us. Let me know what you think.

@malonso1976
Copy link

Hi Dan,
my first approach was to replace tryAquire by acquire without a timeout to block until done. That works.

But Adrian told me to use concurrency-limits because it will make another scenario covered by the solution: direct zipkin span write through rest api. It makes sense... I could not test it under load, but it is on my task list. Have you setup elastic capacity settings according to concurrency queue?

@DanMelman
Copy link

By "concurrency queue" you refer to the "limit"/"concurrency" in the configuration?
Let me know how can I contact you directly, I can do some runs under load.

@malonso1976
Copy link

Sure, write your email in this thread and will mail you back

@Logic-32
Copy link
Contributor

The tryAcquire() approach with a timeout should not have any negative impact on those using the HTTP API as long as the timeout is kept reasonable. I'm not sure what reasonable is but <= 1m should be good. That'll give Elasticsearch enough time to flush some stuff but not cause a huge backup on HTTP buffer items.

If I get a chance I'll try and it on my end and see what happens. No promises on timeline :(

@pg-yang
Copy link

pg-yang commented Apr 2, 2019

Has it been fixed?

@codefromthecrypt
Copy link
Member

codefromthecrypt commented Apr 2, 2019 via email

@Logic-32
Copy link
Contributor

Logic-32 commented Apr 2, 2019

I'm currently working on something that should help minimize the impact of this. The general idea is to allow Zipkin to do some buffering rather than just have a hard cutoff. This should help greatly in times of spike requests and sluggishness on the side of ES.

It won't address any of the items you mentioned @adriancole, but should still help. What I need to know, if anyone can offer some advice, is how to get a config property into HttpCall. Currently it's kind of inferred from OkHttp's configuratoin which is not fantastic for general config properties.

@codefromthecrypt
Copy link
Member

codefromthecrypt commented Apr 2, 2019 via email

@Logic-32
Copy link
Contributor

Logic-32 commented Apr 2, 2019

High level plan is to replace the existing Semaphore in HttpCall with something that allows for a queue of some kind.

My original plan was to update the OkHttpClient.Builder in ZipkinElasticsearchOkHttpAutoConfiguration to have a Dispatcher with an ExecutorService that allowed for fixed-length queue. Then I realized that the Dispatcher maintains it's own unbounded queue (which is a real problem and should be fixed/configurable in OkHttp itself, but I digress) so the ExecutorService doesn't help.

Now my general idea is to just inspect the Dispatcher queue length to see if we exceed a certain threshold (this is where I want an autoconfigure property but can't easily get one) and drop requests if we do. Given that ZipkinElasticsearchOkHttpAutoConfiguration exists I assumed it'd be ok to have a setting from it accessible in HttpCall. But since I'm not seeing any obvious way to do the plumbing I suppose it's better to not. However, if I don't, then I'm not quite sure where else to put the threshold check.

I could always wrap everything up in an ExecutorService somewhere (like ElasticsearchSpanConsumer) so that things are easy to control. But I'm not sure how I'd properly transition all existing configurations over to using that without some unexpected behavior.

My main concern here is the usage of the HttpBulkIndexer in ElasticsearchSpanConsumer. The reason that is my primary concern is because of the issue that brought about #1760. The fix for that was, perhaps unfortunately, too broad and limited any interaction with ES. The change I'm proposing is more focused on the actual collection of spans which can cause back ups and OOM errors.

To emphasize one point: the purpose of the queue is to help work through spike-load and semi-sluggish flushing to ES. OkHttp's unbounded queue seems like the real source of the problem in #1760 and what I think the Semaphore was trying to work around and therefore what I want to as well but in a less aggressive way.

Unfortunately, I'm not sure how to relate any of this back to Kafka because we report to Zipkin over HTTP and the ElasticsearchSpanConsumer doesn't seem to care about whether you do that or use Kafka.

@Logic-32
Copy link
Contributor

Logic-32 commented Apr 2, 2019

Until I get a response to my above comment, I think I'll pursue the idea of removing the Semaphore and adding an ExecutorService in ElasticsearchSpanConsumer (or ElasticsearchStorage) to help decouple the ES component from OkHttp. Hopefully that addresses your initial concern @adriancole. Regardless, we can always iterate on it a bit.

@codefromthecrypt
Copy link
Member

codefromthecrypt commented Apr 2, 2019 via email

@Logic-32
Copy link
Contributor

Logic-32 commented Apr 2, 2019

Going to run it on our servers for a bit before even attempting a pull request to make sure it is stable. But when I commit it, it'll definitely be on a branch :)

Is there any reference material on the asynchronous reporter I can review to make sure I'm not reinventing a wheel? Is there an ETA on when that'll come out? And you're not talking about the reporter in Brave, correct?

@codefromthecrypt
Copy link
Member

@Logic-32 The main thing is that there are so many different types of issues :) They should be addressed independently.

The backlog not being properly addressed in #1760 is simply a flaw or oversight. My guess is that you can create this as a test and break it. If it wasn't a fault in our design (letting the queue grow unbounded), then we can also ask okhttp about how to change.. there's sometimes a way. Can you raise an independent issue on the unbounded queue problem with the description you gave?

@codefromthecrypt
Copy link
Member

on the async reporter.. it has a bundling feature with a size limitation of both the message size to the host and the backlog. What I mean is that if not using a durable queue, then we are back to memory. If in memory, we have to be careful about implementation of the bounded queue so as to not OOM and the AsyncReporter has code to do this. I would suggest possibly a reporter adapter to do the bulk message if we get to this point.

However, there are still the concern of knowing when you should retry which can also be done separately. ES will definitely fail for unknown amounts of time.. So, normalizing the exception will help especially as sometimes the cluster is just dead as opposed to saying my queue is full.

Main point is that there are several different issues, but opening one per case is a good idea so that they can be tracked. Ex same as you maybe don't care about durable queue, probably those with durable queues might not want the risk of another memory queue :P So, memory-backed collector queue for storage overloads is its own concern IOTW.

I have to run, but easiest to chat on gitter, too. Thanks for the help.. I think there are some flaws we can fix.

@codefromthecrypt
Copy link
Member

codefromthecrypt commented Apr 3, 2019 via email

@DanMelman
Copy link

The tryAcquire() approach with a timeout should not have any negative impact on those using the HTTP API as long as the timeout is kept reasonable. I'm not sure what reasonable is but <= 1m should be good. That'll give Elasticsearch enough time to flush some stuff but not cause a huge backup on HTTP buffer items.

If I get a chance I'll try and it on my end and see what happens. No promises on timeline :(

Sorry for the delay, would you like me to open a PR on that small adjustment (I know it is not the full solution you're looking for, but it does help in the case of HTTP)?

@codefromthecrypt
Copy link
Member

codefromthecrypt commented Apr 10, 2019 via email

@Logic-32
Copy link
Contributor

Shoot, that's right. The Semaphore is technically pre-enqueue so it is on the ingestion thread which would block sending a response. Didn't think about that earlier :(

FWIW, I'm testing my changes for #2481 in our production environment soon. Still no ETA on when I'll make a pull request and certainly no promises on it being acceptable. Plus there is some Apache stuff to work through now. But I'll try to stay on top of it :)

@codefromthecrypt
Copy link
Member

@Logic-32 thanks for the status update and keep up the good work. This change has needed a champion for a while and I'm glad you are considering all angles mentioned!

@codefromthecrypt
Copy link
Member

#2502 now includes test instructions please give a try

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants