Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix hight cpu load when client communicate with broker error #1284

Closed
wants to merge 1 commit into from

Conversation

qiyan2019
Copy link

No description provided.

@qiyan2019
Copy link
Author

we use KafkaProducer to send message to kafka cluster, when cluster one node reject our connection (iptable/network incomming overflow ), if you continue send message to the partion on the failed node, producer sender thread will in blank loop and the cpu load will be very high

@Ormod
Copy link
Contributor

Ormod commented Nov 14, 2017

We've seen something similar happen. At times Kafka producer processes start consuming 100% CPU. After a restart the cpu usage goes to a normal, very low percentage.

Any comments on of if this PR would be going in any time soon?

@dpkp
Copy link
Owner

dpkp commented Nov 14, 2017

The sender code implements the logic in the java client almost verbatim and I'm not inclined to make changes without a deeper understanding of the issue. If there is a change required, I'd like to get more context and understand: (1) does the java client also have this flaw? (2) if not, what are we doing differently that might cause it?

@qiyan2019
Copy link
Author

Java client does not have this flaw, the simple example code is here.
Just use iptables drop all tcp packet on one of the kafka node.

Chain INPUT (policy ACCEPT)
target     prot opt source               destination
DROP       all  --  10.111.67.239        0.0.0.0/0

Here is Python demo Code.

#!/usr/bin/env python
#-*- coding: utf-8 -*-

from kafka import KafkaProducer
from kafka.partitioner.default import DefaultPartitioner
import time
import uuid

import logging

import logging

console = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s %(name)-12s: %(levelname)-8s %(message)s')
console.setFormatter(formatter)
console.setLevel(logging.DEBUG)
logging.root.addHandler(console)
logging.root.setLevel(logging.DEBUG)


producer = KafkaProducer(bootstrap_servers=['10.111.101.160:9092'],retries=10,max_block_ms=1,request_timeout_ms=1000)

print producer.partitions_for('lwb-test-topic1')

while True:
    ret = producer.send('lwb-test-topic1', key=str(uuid.uuid1()), value=b'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb')
    time.sleep(11)

Here is Java demo code

package com.frtmelody;

import java.util.*;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.*;
import org.apache.log4j.PropertyConfigurator;


public class Main {
    private static Logger logger = Logger.getLogger(Main.class);
    public static void main(String[] args) {
        PropertyConfigurator.configure("log4j.properties");
        long events = Long.parseLong(args[0]);
        long sleep = Long.parseLong(args[1]);
        logger.debug("events num" + args[0] + " sleep " + args[1]);

        Properties props = new Properties();
        props.put("bootstrap.servers", "10.111.101.160:9092");
        props.put("acks", "all");
        props.put("retries", 10);
        props.put("request.timeout.ms", 1000);
        props.put("timeout.ms", 100);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<String, String>(props);

        for (long nEvents = 0; nEvents < events; nEvents++) {
            String msg = "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb";
            Future<RecordMetadata> future= producer.send(new ProducerRecord<String, String>("lwb-test-topic1",  UUID.randomUUID().toString(), msg));
            try {
                Thread.sleep(sleep);
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
        producer.close();
    }
}

@qiyan2019
Copy link
Author

any process?

@dpkp
Copy link
Owner

dpkp commented Dec 5, 2017

Apologies -- all test fixtures were failing due to an unrelated issue. Please pull latest changes from master and re-run tests.

@jeffwidman
Copy link
Collaborator

jeffwidman commented Feb 21, 2018

@frtmelody are you planning to rebase this?

@dpkp
Copy link
Owner

dpkp commented Mar 8, 2018

I looked into this a bit more and I believe it is simply a bug in BrokerConnection.connection_delay() -- which is expected to return milliseconds but currently returns seconds. I'd rather fix that issue and not introduce a divergent logic branch. Will put up a separate PR.

@dpkp dpkp closed this Mar 8, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants