Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Why is the data lost in elasticsearch ? #1141

Closed
liangman opened this issue Jun 24, 2016 · 49 comments
Closed

Why is the data lost in elasticsearch ? #1141

liangman opened this issue Jun 24, 2016 · 49 comments

Comments

@liangman
Copy link

liangman commented Jun 24, 2016

I used the zipkin, kafka, and elasticsearch for testing. The elasticsearch was only a node. It wasn't problem that the data was transferred from kafka to zipkin, but the data is lost from zipkin to elasticsearch. I wrote 500000 log-data to kafka, but it was only 212162 in the elasticsearch.
The data:

[{"traceId":"ffffffffffffffff","name":"fermentum","id":"ffffffffffffffff","annotations":[{"timestamp":1466386794757000,"value":"sr","endpoint":{"serviceName":"semper","ipv4":"113.29.89.129","port":2131}},{"timestamp":1466386794757000,"value":"sagittis","endpoint":{"serviceName":"semper","ipv4":"113.29.89.129","port":2131}},{"timestamp":1466386794758000,"value":"montes","endpoint":{"serviceName":"semper","ipv4":"113.29.89.129","port":2131}},{"timestamp":1466386794758000,"value":"augue","endpoint":{"serviceName":"semper","ipv4":"113.29.89.129","port":2131}},{"timestamp":1466386794759000,"value":"malesuada","endpoint":{"serviceName":"semper","ipv4":"113.29.89.129","port":2131}},{"timestamp":1466386794760000,"value":"ss","endpoint":{"serviceName":"semper","ipv4":"113.29.89.129","port":2131}}],"binaryAnnotations":[{"key":"mollis","value":"hendrerit","endpoint":{"serviceName":"semper","ipv4":"113.29.89.129","port":2131}}]}]

"ffffffffffffffff" was replaced in "1-5000000";
It was no regular!!!

@codefromthecrypt
Copy link
Member

codefromthecrypt commented Jun 24, 2016

there's a "/metrics" endpoint. It would show how many spans were accepted by kafka on the zipkin-server side. can you send relevant output from that?

also, I don't understand what you mean by

"ffffffffffffffff" was replaced in "1-5000000";

do you mean that when you post a span where the id is all bits set, it comes back not in hex?

@liangman
Copy link
Author

I set id from 1-500000 in hex. I think that the index have a conflict.

@liangman
Copy link
Author

The lost data is different for storing in the elasticsearch every, when I wrote data in the kafka.

@codefromthecrypt
Copy link
Member

wondering if this is to do with the data being older? Ex. if I post this, I have to set lookback to a relatively high value to see it.

$ curl -s localhost:9411/api/v1/spans -X POST -H "Content-Type: application/json" --data '[
  {
    "traceId": "ffffffffffffffff",
    "name": "fermentum",
    "id": "ffffffffffffffff",
    "annotations": [
      {
        "timestamp": 1466386794757000,
        "value": "sr",
        "endpoint": {
          "serviceName": "semper",
          "ipv4": "113.29.89.129",
          "port": 2131
        }
      },
      {
        "timestamp": 1466386794757000,
        "value": "sagittis",
        "endpoint": {
          "serviceName": "semper",
          "ipv4": "113.29.89.129",
          "port": 2131
        }
      },
      {
        "timestamp": 1466386794758000,
        "value": "montes",
        "endpoint": {
          "serviceName": "semper",
          "ipv4": "113.29.89.129",
          "port": 2131
        }
      },
      {
        "timestamp": 1466386794758000,
        "value": "augue",
        "endpoint": {
          "serviceName": "semper",
          "ipv4": "113.29.89.129",
          "port": 2131
        }
      },
      {
        "timestamp": 1466386794759000,
        "value": "malesuada",
        "endpoint": {
          "serviceName": "semper",
          "ipv4": "113.29.89.129",
          "port": 2131
        }
      },
      {
        "timestamp": 1466386794760000,
        "value": "ss",
        "endpoint": {
          "serviceName": "semper",
          "ipv4": "113.29.89.129",
          "port": 2131
        }
      }
    ],
    "binaryAnnotations": [
      {
        "key": "mollis",
        "value": "hendrerit",
        "endpoint": {
          "serviceName": "semper",
          "ipv4": "113.29.89.129",
          "port": 2131
        }
      }
    ]
  }
]'
$ curl -s 'localhost:9411/api/v1/traces?serviceName=semper&lookback=5000000000'|jq .

@codefromthecrypt
Copy link
Member

I started a local server against elasticsearch and got the same output. ex the trace id returned the same as if it were not using elasticsearch

$ SELF_TRACING_ENABLED=false STORAGE_TYPE=elasticsearch java -jar zipkin.jar 

@codefromthecrypt
Copy link
Member

@liangman we need some way of reproducing the problem, so maybe verify versions and see if you can reproduce something using POST like above. Our tests run latest zipkin against elasticsearch 2.2.1

@liangman
Copy link
Author

Ok, It is normal for using "POST", but why is the data lost for using "kafka + ES"?

@codefromthecrypt
Copy link
Member

great... glad to see progress. First step is to make sure that when
you say POST you mean POST + ES. That way, there's only one variable
changing, the transport. After you run your scenario, store the
collector metrics, which should show how many messages were sent, if
any were dropped etc.

ex. http://localhost:9411/metrics

https://github.com/openzipkin/zipkin/tree/master/zipkin-server#collector

Once you verify this, change only the transport variable and run the
same scenario (i.e. report using Kafka, not HTTP).

look at the /metrics endpoint and compare the kafka stats with the
http stats from the prior pass. you can also run the server with the
argument --logging.level.zipkin=DEBUG commandline argument

https://github.com/openzipkin/zipkin/tree/master/zipkin-server#logging

You might see dropped spans or dropped messages in the metrics output,
and you might see exceptions in the log output. This is how we can
start progressing from here.

@liangman
Copy link
Author

Of course, it was the "POST + ES";
According to what you said, I find that it has the same result.
kafka:
"counter.zipkin_collector.messages.kafka":100,"gauge.zipkin_collector.message_spans.kafka":1.0,"gauge.zipkin_collector.message_bytes.kafka":4068.0,"counter.zipkin_collector.spans.kafka":100,"counter.zipkin_collector.bytes.kafka":406770,"httpsessions.max":-1,"httpsessions.active":0}
http:
"gauge.zipkin_collector.message_bytes":1569.0,"counter.zipkin_collector.spans":100,"gauge.zipkin_collector.message_spans":1.0,"counter.zipkin_collector.messages.http":100,"counter.zipkin_collector.bytes":156870,"gauge.response.api.v1.spans":2.0,"counter.status.202.api.v1.spans":100,"httpsessions.max":-1,"httpsessions.active":0}
And I run the zipkin with the argument --logging.level.zipkin=DEBUG, but it is normal.

@liangman
Copy link
Author

I had sent the same 100 data with kafka and http.
kafka + es:
paas@PaasAPMBootstrap:/var/paas$ curl -XGET '129.188.37.108:9200/_cat/indices?v'
health status index pri rep docs.count docs.deleted store.size pri.store.size
yellow open zipkin-2016-06-20 5 1 114 0 8.8kb 8.8kb

http+es:
paas@PaasAPMBootstrap:/var/paas$ curl -XGET '129.188.37.108:9200/_cat/indices?v'
health status index pri rep docs.count docs.deleted store.size pri.store.size
yellow open zipkin-2016-06-20 5 1 200 0 20.4kb 20.4kb

@codefromthecrypt
Copy link
Member

codefromthecrypt commented Jun 25, 2016 via email

@liangman
Copy link
Author

liangman commented Jun 25, 2016

senddata.sh:

#!/bin/bash
TIMESTAMP=$(node -e 'console.log(new Date().getTime())')000
curl -s localhost:9411/api/v1/spans -X POST -H "Content-Type: application/json" --data '[{
    "traceId": "'${1}'",
    "name": "fermentum",
    "id": "'${1}'",
    "annotations": [
      {
        "timestamp": '${TIMESTAMP}',
        "value": "sr",
        "endpoint": {
          "serviceName": "semper",
          "ipv4": "113.29.89.129",
          "port": 2131
        }
      },
      {
        "timestamp": '${TIMESTAMP}',
        "value": "sagittis",
        "endpoint": {
          "serviceName": "semper",
          "ipv4": "113.29.89.129",
          "port": 2131
        }
      },
      {
        "timestamp": '${TIMESTAMP}',
        "value": "montes",
        "endpoint": {
          "serviceName": "semper",
          "ipv4": "113.29.89.129",
          "port": 2131
        }
      },
      {
        "timestamp": '${TIMESTAMP}',
        "value": "augue",
        "endpoint": {
          "serviceName": "semper",
          "ipv4": "113.29.89.129",
          "port": 2131
        }
      },
      {
        "timestamp": '${TIMESTAMP}',
        "value": "malesuada",
        "endpoint": {
          "serviceName": "semper",
          "ipv4": "113.29.89.129",
          "port": 2131
        }
      },
      {
        "timestamp": '${TIMESTAMP}',
        "value": "ss",
        "endpoint": {
          "serviceName": "semper",
          "ipv4": "113.29.89.129",
          "port": 2131
        }
      }
    ],
    "binaryAnnotations": [
      {
        "key": "mollis",
        "value": "hendrerit",
        "endpoint": {
          "serviceName": "semper",
          "ipv4": "113.29.89.129",
          "port": 2131
        }
      }
    ]
  }]'

@liangman
Copy link
Author

liangman commented Jun 25, 2016

start_send.sh:

#!/bin/bash

i=1

while [ $i -lt $1 ]
        do
        echo $i
        ./senddata.sh `printf "%x" $i`
        let "i=${i}+1"
done

@liangman
Copy link
Author

./start_send.sh 101
This is the script of "post + kafka".

@liangman
Copy link
Author

I use the java for writing the data in kafka, so I don't know how i post it. But I can write the script with the python. Please wait a moment.

@codefromthecrypt
Copy link
Member

codefromthecrypt commented Jun 25, 2016

maybe you can send the same json you send via http using something like
this?

$ kafka-console-producer.sh --broker-list $ADVERTISED_HOST:9092 --topic zipkin
[{"traceId":"1","name":"bang","id":"2","timestamp":1234,"binaryAnnotations":[{"key":"lc","value":"bamm-bamm","endpoint":{"serviceName":"flintstones","ipv4":"127.0.0.1"}}]}]

https://github.com/openzipkin/zipkin/tree/master/zipkin-collector/kafka#json

@liangman
Copy link
Author

liangman commented Jun 25, 2016

senddatatokafka.py:

#!/bin/python
import time
import os

data=data="""[{
"traceId": "%x",
"name": "fermentum",
"id": "%x",
"annotations": [
{
"timestamp": %i,
"value": "sr",
"endpoint": {
"serviceName": "semper",
"ipv4": "113.29.89.129",
"port": 2131
}
},
{
"timestamp": %i,
"value": "sagittis",
"endpoint": {
"serviceName": "semper",
"ipv4": "113.29.89.129",
"port": 2131
}
},
{
"timestamp": %i,
"value": "montes",
"endpoint": {
"serviceName": "semper",
"ipv4": "113.29.89.129",
"port": 2131
}
},
{
"timestamp": %i,
"value": "augue",
"endpoint": {
"serviceName": "semper",
"ipv4": "113.29.89.129",
"port": 2131
}
},
{
"timestamp": %i,
"value": "malesuada",
"endpoint": {
"serviceName": "semper",
"ipv4": "113.29.89.129",
"port": 2131
}
},
{
"timestamp": %i,
"value": "ss",
"endpoint": {
"serviceName": "semper",
"ipv4": "113.29.89.129",
"port": 2131
}
}
],
"binaryAnnotations": [
{
"key": "mollis",
"value": "hendrerit",
"endpoint": {
"serviceName": "semper",
"ipv4": "113.29.89.129",
"port": 2131
}
}
]
}]"""
def main():
  count = 100  
  data1=data.replace(' ', '').replace('\n', '')
  cmdp = r'./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic zipkin'
  pipp = os.popen(cmdp, 'w')
  i = 0
  while i < count:
    i += 1
    timestamp = time.time() * 10 ** 6
    pipp.write(data1%(i, i, timestamp, timestamp, timestamp, timestamp, timestamp, timestamp) + "\r\n")
    #print data1%(i, i, timestamp, timestamp, timestamp, timestamp, timestamp, timestamp)
  print 'finsh!'
  pipp.close()

if __name__ == '__main__':
  main()
python senddatatokafka.py

@liangman
Copy link
Author

Beause I remove the " " and "\n" for sending to kafka. I think that you can try it with the script.

@codefromthecrypt
Copy link
Member

codefromthecrypt commented Jun 25, 2016

OK so I've verified that with the following setup, I get readbacks between 58 and 100 spans when using the kafka script vs the http one which routinely reads back 100.

what I do, is run the scenarios below while kafka is left up, but elasticsearch is cleaned between runs


Where below instructions run kafka and ES

# start kafka
$ curl -SL http://www.us.apache.org/dist/kafka/0.8.2.2/kafka_2.11-0.8.2.2.tgz | tar xz
$ nohup bash -c "cd kafka_* && bin/zookeeper-server-start.sh config/zookeeper.properties >/dev/null 2>&1 &"
$ nohup bash -c "cd kafka_* && bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &"

# start ES
$ curl -SL https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.2.1/elasticsearch-2.2.1.tar.gz | tar xz
$ elasticsearch-*/bin/elasticsearch -d > /dev/null

And.. I start zipkin-server like so..

SELF_TRACING_ENABLED=false KAFKA_ZOOKEEPER=localhost:2181 STORAGE_TYPE=elasticsearch java -jar zipkin.jar --logging.level.zipkin=DEBUG

HTTP

When I run the HTTP test like this:

for i in {1..100}; do ./senddata.sh `printf "%x" $i`; done

I get these collector metrics:

  "gauge.zipkin_collector.message_spans.http": 1,
  "counter.zipkin_collector.spans.http": 100,
  "gauge.zipkin_collector.message_bytes.http": 1569,
  "counter.zipkin_collector.messages.http": 100,
  "counter.zipkin_collector.bytes.http": 156870,

And the api count looks correct:

$ curl -s 'localhost:9411/api/v1/traces?lookback=500000000&limit=100'|jq '. | length'
100

Kafka

When I run the Kafka test like this:

$ python senddatatokafka.py 
Java HotSpot(TM) 64-Bit Server VM warning: Option UseParNewGC was deprecated in version 9.0 and will likely be removed in a future release.
[2016-06-25 14:11:06,504] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
finsh!
[2016-06-25 14:11:06,676] WARN Error while fetching metadata [{TopicMetadata for topic zipkin -> 
No partition metadata for topic zipkin due to kafka.common.LeaderNotAvailableException}] for topic [zipkin]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-06-25 14:11:06,684] WARN Error while fetching metadata [{TopicMetadata for topic zipkin -> 
No partition metadata for topic zipkin due to kafka.common.LeaderNotAvailableException}] for topic [zipkin]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-06-25 14:11:06,684] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: zipkin (kafka.producer.async.DefaultEventHandler)

I get these collector metrics:

  "gauge.zipkin_collector.message_bytes.kafka": 903,
  "counter.zipkin_collector.bytes.kafka": 90270,
  "counter.zipkin_collector.spans.kafka": 100,
  "gauge.zipkin_collector.message_spans.kafka": 1,
  "counter.zipkin_collector.messages.kafka": 100,

And the api count looks correct sometimes, and not others (always the stats look the same):

$ curl -s 'localhost:9411/api/v1/traces?lookback=500000000&limit=100'|jq '. | length'
100

@codefromthecrypt
Copy link
Member

NEXT STEP:

one difference between in-memory storage and ES storage is that the former doesn't do anything asynchronously. We should validate that this scenario against Cassandra, too (as it also uses guava futures).

@liangman
Copy link
Author

en, I will try it again following you step.

@codefromthecrypt
Copy link
Member

Might be an issue in cassandra, too, but looks like #1142 is blocking my ability to use the normal readback (only returns 10-16)

@liangman
Copy link
Author

No, when I send 100 data to the kafka, all data is written in cassandra. But I view 10 data from the page of zipkin. Maybe there is a bug in the code...

@codefromthecrypt
Copy link
Member

@liangman I edited the comment for the http script. can you edit the one for kafka and make sure that timestamps are reset each time (using epoch micros)?

@codefromthecrypt
Copy link
Member

NEXT STEP:

See the behavior when the kafka script reports spans with unique timestamps. For example TIMESTAMP=$(node -e 'console.log(new Date().getTime())')000. I don't really expect this to make a difference, but we ought to be consistent.

A step after that would be to instrument ElasticsearchSpanConsumer in some way that we can track the futures (possibly ensuring the result has the correct numberOfActions() etc). This might be hard to track down, but at least the scenario is repeatable.

ps I'm offline likely the rest of the day, but might look into this tomorrow.

@codefromthecrypt
Copy link
Member

sure.. I'd like you to edit your comment here #1141 (comment)

in the span you are generating in python, please make it have timestamps according to current system time.

That reduces the work needed when querying and also ttl considerations. For example, you can look at how I edited the http script.

#1141 (comment)

after that you can try to troubleshoot ElasticsearchSpanConsumer by customizing the class, building and running locally. https://github.com/openzipkin/zipkin/tree/master/zipkin-server#running-locally
For example, you could add print statements etc.

If you aren't familiar enough to do that, you'd likely need to wait until I have more time to help (or someone else does).

@liangman
Copy link
Author

I have edited the script again. It may meet your requirements.

@codefromthecrypt
Copy link
Member

thanks. will take a look today

On Sat, Jun 25, 2016 at 5:19 PM, liangman [email protected] wrote:

I have edited the script again. It may meet your requirements.


You are receiving this because you commented.
Reply to this email directly, view it on GitHub
#1141 (comment),
or mute the thread
https://github.com/notifications/unsubscribe/AAD616_seZUf5vy0cjj1plPDalYGfOJeks5qPPKWgaJpZM4I9cMB
.

@codefromthecrypt
Copy link
Member

Update:

Adding a 100ms sleep between kafka messages seems to avoid this issue.. There's no errors, but something seems to drop when processing concurrently.

@codefromthecrypt
Copy link
Member

Note: synchronizing ElasticsearchSpanConsumer doesn't make the problem go away.. there's something else going on that sleeping 100ms between creating each future avoids.

@codefromthecrypt
Copy link
Member

I've found a workaround. This happens when a bulk request is used for a single request (only 1 span). When I special-case to not use bulk requests when there's only 1 span, the problem disappears.

I've two remedies:

  • I'll adjust the code to special-case storage when only one span exists
  • consider sending more than one span per kafka message (note that @prat0318 notices this increases throughput anyway).

We need to create a soak test at some point, too, as the special-casing may not be fixing the root cause, even if it helps. cc @anuraaga

@prat0318
Copy link

prat0318 commented Jun 26, 2016

@adriancole i am a bit confused. How come the way messages from kafka are read can affect something specific to ElasticSearch. If i am not wrong, the flow will be kafka -> collector -> ES/C*. From what i read, message loss is only seen for ES and not C*. But the transport (kafka/http) work is done once the data is fetched via collector. So how is the issue related to kafka batching plus ES storage?

@codefromthecrypt
Copy link
Member

From what i read, message loss is only seen for ES and not C. But the transport (kafka/http) work is done once the data is fetched via collector. So how is the issue related to kafka batching plus ES storage?

I think http only worked because the test is slower. For example,
sleeping 100ms in kafka loop also succeeded.

TL;DR; I would recommend the bundling feature to anyone who seems like
they are writing tests or instrumentation. It isn't about this issue
specifically, more about encouraging supportable practice.

Buffering and bundling spans ends up using
AsyncSpanConsumer.accept(List...) as it was written to be used. At the
moment, the only way to control the count of spans stored at a time by
zipkin is to change the instrumentation/reporters to send more than
one span per message. I recall a discussion of adding a buffering
layer internally to the kafka code, but that didn't go anywhere.

We've regularly encountered issues with not bundling across different
transports and storage backends.. to the point where we changed the
standard format to be a list (the storage api was always a list even
in scala). You spent a lot of time learning that bundling helps a few
months back, but this isn't a c* only concern. ElasticSearch was
written with the assumption that storing lists is the common case..
else it wouldn't have used Bulk operations in the first place. This is
the same thing with MySQL and likely will end up the way for C* at
some point. We really ought to encourage bundling as a standard
feature for reporting spans regardless if the bundle size policy will
be transport-specific.

The other option is to see a practice, like dumping many messages at
the same time, and say nothing. That person might go production etc
assuming span-per-message is fine.. to a point where it is hard to
change their instrumentation. Maybe they never considered bundling at
all. Would they have the experience and time you did to troubleshoot
and refactor late in deployment? Would someone be available for free
OSS support for several days? Would folks want to hack zipkin to
buffer on behalf of them? Would that debate finish quick enough and
with the desired outcome to resolve the issue? Maybe to all, but I'd
rather raise a flag about a known stick then get smacked with it
later.

Truth is, we can't count on volunteers to do free support, design and
dev work on-demand.. so we have to work in ways that are likely to use
the limited time we have to help the highest amount of users. When
folks have bundling in mind from the beginning, it can be adjusted
when they get into a support problem, or in their test scenario. They
can solve or work around more problems without us.

codefromthecrypt pushed a commit that referenced this issue Jun 26, 2016
During a test where 100 single-span messages are sent to Kafka at the
same time, I noticed only 58-97 of them would end up in storage
eventhough all messages parsed properly and no operations failed.

After a 100ms/message pause was added, the store rate of this test went
to 100%, so figured it was some sort of state issue. I noticed the code
was using Bulk operations regardless of input size, so as a wild guess
changed the special-case single-span messages. At least in this test, it
raised the success rate to 100% without any pausing needed.

I don't know why this worked, but it seems sensible to not use bulk apis
when there's no bulk action to perform.

I started to write a unit test to validate single-length lists don't use
bulk, but the Mockito involved became too verbose as the Elasticsearch
client uses chaining and other patterns that are tedious to mock.

Instead, we should make a parallel integration test and apply them to
all storage components.

See #1141
@codefromthecrypt
Copy link
Member

here's the workaround.. when merged it should be testable from snapshot (of course if you are in a position to build the branch, please do

#1146

@liangman
Copy link
Author

liangman commented Jun 27, 2016

Ok, I have updated the file ElasticsearchSpanConsumer.java.

count = 250

But when I set the count of log-data for sending the kafka, there is an Wranning here:

2016-06-27 11:33:08.106  WARN 59971 --- [[listener][T#3]] zipkin.collector.kafka.KafkaCollector    : Cannot store spans [00000000000000e3.00000000000000e3<:00000000000000e3] due to EsRejectedExecutionException(rejected execution of org.elasticsearch.transport.TransportService$4@53793b3c on EsThreadPoolExecutor[index, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@33aec4eb[Running, pool size = 4, active threads = 4, queued tasks = 200, completed tasks = 671]])

org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution of org.elasticsearch.transport.TransportService$4@53793b3c on EsThreadPoolExecutor[index, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@33aec4eb[Running, pool size = 4, active threads = 4, queued tasks = 200, completed tasks = 671]]
    at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:50) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) [na:1.8.0_73]
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) [na:1.8.0_73]
    at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.execute(EsThreadPoolExecutor.java:85) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.elasticsearch.transport.TransportService.sendLocalRequest(TransportService.java:346) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:310) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.elasticsearch.action.support.replication.TransportReplicationAction$ReroutePhase.performAction(TransportReplicationAction.java:463) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.elasticsearch.action.support.replication.TransportReplicationAction$ReroutePhase.doRun(TransportReplicationAction.java:444) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.elasticsearch.action.support.replication.TransportReplicationAction.doExecute(TransportReplicationAction.java:125) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.elasticsearch.action.index.TransportIndexAction.innerExecute(TransportIndexAction.java:134) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.elasticsearch.action.index.TransportIndexAction.doExecute(TransportIndexAction.java:118) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.elasticsearch.action.index.TransportIndexAction.doExecute(TransportIndexAction.java:65) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:70) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.elasticsearch.action.support.replication.TransportReplicationAction$OperationTransportHandler.messageReceived(TransportReplicationAction.java:238) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.elasticsearch.action.support.replication.TransportReplicationAction$OperationTransportHandler.messageReceived(TransportReplicationAction.java:235) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(MessageChannelHandler.java:244) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(MessageChannelHandler.java:114) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) ~[netty-3.10.5.Final.jar!/:na]
    at org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChannelsHandler.java:75) ~[elasticsearch-2.3.2.jar!/:2.3.2]
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) ~[netty-3.10.5.Final.jar!/:na]
    at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) ~[netty-3.10.5.Final.jar!/:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_73]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_73]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_73]
curl -s 'localhost:9411/api/v1/traces?lookback=500000000&limit=100'|jq '. | length'
100

@liangman
Copy link
Author

The result in the ES:

health status index             pri rep docs.count docs.deleted store.size pri.store.size 
yellow open   zipkin-2016-06-27   5   1        414            0    147.1kb        147.1kb 

@liangman liangman reopened this Jun 27, 2016
@codefromthecrypt
Copy link
Member

@liangman so let's first make sure we know what does work and what doesn't. Are you saying 100 works, but if you send 250 messages at the same time, you get that rejected exception?

since the error includes a capacity of 200, again I'm wondering what would happen if instead of doing 250 spans w/ 250 messages, you instead batched them as 10 or more per message (ex the json array includes 10 items not 1).

ps here's the latest snapshot, if it helps http://oss.jfrog.org/oss-snapshot-local/io/zipkin/java/zipkin-server/1.1.6-SNAPSHOT/zipkin-server-1.1.6-20160627.030409-3-exec.jar

@codefromthecrypt
Copy link
Member

Back to the error.. Right now the index queue depth is 200 and you've overrun it. That means requests are being accepted faster than they can be processed.

I think it will be useful to adjust the spans/message count to see if you can make storage more efficient with the same topology.

There's a lot of reasons it could be backed up, including the usual cpu, mem, network bottlenecks. It could be backed up from a slow cluster even.. We won't be able to troubleshoot what's the bottleneck in your environment, suffice to say you've hit a limit.

From an ES point of view, you can look at the tuning options there. Maybe start with this https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-threadpool.html and run experiments until you've found the key parameters that help.

@anuraaga
Copy link
Contributor

anuraaga commented Jun 27, 2016

FWIW, it is possible to adjust the default queue size through elasticsearch.yml or cluster settings if you need
https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-threadpool.html

...though Adrian beat me to it ;)

But as Adrian said, it seems like you are having issues with a highly-synthetic workload, not a real-world one. If this synthetic workload is important to you, then I'd recommend trying to modify the cluster settings, but otherwise would recommend trying to send spans in batch instead (as the Java brave would). And keep in mind, Elasticsearch is IMO a medium-scale datastore. It's not going to get reasonable performance on a single node (this config is only for testing / development) and it's not unlikely you'd run into perf issues with it.

@liangman
Copy link
Author

Maybe I have to consider changing the database from ES to cassandra.
Because we forecast that there are at least 100 thousand log-datas to be sent to kafka per 1s in our micro service.
So I will need a distributed Zipkin for consuming the data.

@liangman
Copy link
Author

When I use the cassandra for storing, it spends about 100s for zipkin consuming 500000 data (per about 1kb).

@codefromthecrypt
Copy link
Member

codefromthecrypt commented Jun 27, 2016 via email

@codefromthecrypt
Copy link
Member

I'm closing this issue for now as we've progressed from no-errors and dropped data, to errors that explain why data was dropped (overran queue length), and an ES storage config suggestion to improve that.

@liangman
Copy link
Author

ok.

@codefromthecrypt
Copy link
Member

@liangman by the way, thanks your script allowed us to repeat the problem and convert it from a mystery to an explanation. The next users will be better off from your efforts.

@xqliang
Copy link

xqliang commented Jun 12, 2017

I use the latest Zipkin(1.26) and with a single node elasticsearch(2.3.4) for storage(the default docker-zipkin configuration), and still encounter data lost.

Here is the test script(sendtozipkin.sh):

#!/bin/bash

function send_to_zipkin() {
    id=$1
    id2=$2
    millis=$(python -c 'import time; print "%d" % (time.time() * 1000 * 1000)')
    curl localhost:9411/api/v1/spans -X POST -H "Content-Type: application/json" --data '[{
        "traceId": "'${id}'",
        "name": "fermentum",
        "id": "'${id}'",
        "annotations": [
          {
            "timestamp": '${millis}',
            "value": "sr",
            "endpoint": {
              "serviceName": "semper",
              "ipv4": "113.29.89.129",
              "port": 2131
            }
          }
        ]
    }, {
        "traceId": "'${id}'",
        "name": "fermentum1",
        "id": "'${id2}'",
        "annotations": [
         {
            "timestamp": '${millis}',
            "value": "sr",
            "endpoint": {
              "serviceName": "semper",
              "ipv4": "113.29.89.129",
              "port": 2131
            }
          }
        ]
    }]'
}
i=0

while [ $i -lt $1 ]; do
    let "i=${i}+1"
    let "j=${i}+16777216"
    echo $i
    send_to_zipkin `printf "%016x" $i` `printf "%016x" $j`
done

Then send 100(50*2) messages to Zipkin:

$ ./sendtozipkin.sh 50

And some random messages are lost, I addES_HTTP_LOGGING=BODY (introduced in Zipkin 1.25 ) environment to docker-compose-elasticsearch.yaml, and saw these errors:

# grep 'errors\\":true' $(docker inspect --format='{{.LogPath}}' zipkin) | head -1
{"log":"2017-06-12 03:04:29.733  INFO 6 --- [41.133:9200/...] z.s.e.http.ElasticsearchHttpStorage      : 
{\"took\":1,\"errors\":true,\"items\":[{\"create\":{\"_index\":\"zipkin-2017-06-03\",\"_type\":\"span\",
\"_id\":\"AVyaQoUkPGYSeXOvPsRT\",\"status\":429,\"error\":{\"type\":\"es_rejected_execution_exception\",
\"reason\":\"rejected execution of org.elasticsearch.transport.TransportService$4@5742b77e on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@2fcaec02[Running, pool size = 8, active threads = 8, queued tasks = 50, completed tasks = 0]]\"}}},
{\"index\":{\"_index\":\"zipkin-2017-06-03\",\"_type\":\"servicespan\",
\"_id\":\"gamerebategift|timeraddpoolamounttask\",\"status\":429,\"error\":{\"type\":\"es_rejected_execution_exception\",
\"reason\":\"rejected execution of org.elasticsearch.transport.TransportService$4@70fc3bfe on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@2fcaec02[Running, pool size = 8, active threads = 8, queued tasks = 50, completed tasks = 0]]\"}}}]}\n",
"stream":"stdout","time":"2017-06-12T03:04:29.733933728Z"}

Check the default configuration:

$ curl -XGUT localhost:9200/_cluster/settings 
{"persistent":{},"transient":{}}

Change threadpool.bulk.queue_size to 500:

$ curl -XPUT localhost:9200/_cluster/settings -d '{"transient": {"threadpool.bulk.queue_size": 500}}'
{"acknowledged":true,"persistent":{},"transient":{"threadpool":{"bulk":{"queue_size":"500"}}}}

Then rerun the sendtozipkin.sh script, there were no data lost any more.

@codefromthecrypt
Copy link
Member

codefromthecrypt commented Jun 12, 2017 via email

@xqliang
Copy link

xqliang commented Jun 12, 2017

Yes, at least Zipkin should log a WARNING/ERROR log if writing ES with errors returned while not setting ES_HTTP_LOGGING=BODY, so we can monitor the log for alerting.

@codefromthecrypt
Copy link
Member

codefromthecrypt commented Jun 12, 2017 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants