Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test failures for console producer/consumer with min.insync.replicas=2 #116

Closed
allquantor opened this issue Jan 7, 2018 · 40 comments
Closed
Milestone

Comments

@allquantor
Copy link

allquantor commented Jan 7, 2018

Running the Kafka tests this one do not complete

NAME                    READY     STATUS    RESTARTS   AGE
kafkacat-4kvj4          3/3       Running   0          1h
produce-consume-kdd92   2/3       Running   0          1h

Logs from Testcase

Nothing consumed yet
Nothing consumed yet
Nothing consumed yet
Nothing consumed yet
Nothing consumed yet
Nothing consumed yet
Nothing consumed yet
....

Is it probably the producer does not generate enough data? No errors in producer or consumer.

Using kubernetes 1.8.5 on GKE

@StevenACoffman
Copy link
Contributor

StevenACoffman commented Jan 8, 2018

@solsson Actually, I synched with latest recently and I started getting the same test failures, where where I didn't with this repo pulled on Nov 12, 2017.

Not sure if it's related, but I have a java consumer where now I keep getting the coordinator marked as dead.

 teachers/splitterator-pure-749947f75f-r8d5k[splitterator]: [pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=splitterator-pure2] (Re-)joining group
teachers/splitterator-pure-749947f75f-r8d5k[splitterator]: [pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=splitterator-pure2] Marking the coordinator kafka-1.broker.kafka.svc.cluster.local:9092 (id: 2147483646 rack: null) dead
teachers/splitterator-pure-749947f75f-r8d5k[splitterator]: [pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=splitterator-pure2] Discovered coordinator kafka-1.broker.kafka.svc.cluster.local:9092 (id: 2147483646 rack: null)
teachers/splitterator-pure-749947f75f-r8d5k[splitterator]: [pool-1-thread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=splitterator-pure2] (Re-)joining group

Previously, the identical java consumer worked fine. I haven't figured this out yet.
Kubernetes 1.8.4 on AWS via kops.

@solsson
Copy link
Contributor

solsson commented Jan 8, 2018

Looks bad. I'm sorry about leaving master relatively untested. Could be the cause of #114 too. I've been caught up in not only holiday season but also #101 (comment).

I wrote these tests while working on new features, but now that they run constantly I find the readiness indication very useful. I will try to reproduce the above issues.

@StevenACoffman
Copy link
Contributor

If I revert to d01c128 before #107 the tests go (eventually) back to:

kubectl get pods -l test-type=readiness --namespace=test-kafka
NAME                    READY     STATUS    RESTARTS   AGE
kafkacat-pflfs          3/3       Running   2          22m
produce-consume-xjbmg   3/3       Running   0          22m

Not sure what's causing the problem though.

@StevenACoffman
Copy link
Contributor

StevenACoffman commented Jan 8, 2018

Hrm, even with the tests succeeding on the old d01c128 , the yahoo kafka-manager addon is failing to do much of anything:

[info] k.m.a.KafkaManagerActor - baseZkPath=/kafka-manager
[info] o.a.z.ClientCnxn - Socket connection established to zookeeper.kafka.svc.cluster.local/100.70.176.112:2181, initiating session
[info] o.a.z.ClientCnxn - Unable to read additional data from server sessionid 0x0, likely server has closed socket, closing socket connection and attempting reconnect
[info] play.api.Play - Application started (Prod)
[info] p.c.s.NettyServer - Listening for HTTP on /0:0:0:0:0:0:0:0:80
[info] o.a.z.ClientCnxn - Opening socket connection to server zookeeper.kafka.svc.cluster.local/100.70.176.112:2181. Will not attempt to authenticate using SASL (unknown error)
[info] o.a.z.ClientCnxn - Socket connection established to zookeeper.kafka.svc.cluster.local/100.70.176.112:2181, initiating session
[info] o.a.z.ClientCnxn - Unable to read additional data from server sessionid 0x0, likely server has closed socket, closing socket connection and attempting reconnect

This is weird, since I can do this:

kubectl exec -it zoo-0 -n kafka -- /bin/bash
root@zoo-0:/opt/kafka# echo ruok | nc -w 1 -q 1 zookeeper.kafka.svc.cluster.local 2181
imokroot@zoo-0:/opt/kafka#

If I go to the actual kafka-manager (via dashboard /api/v1/namespaces/kafka/services/kafka-manager/proxy/):

Yikes! Ask timed out on [ActorSelection[Anchor(akka://kafka-manager-system/), Path(/user/kafka-manager)]] after [5000 ms]

@solsson
Copy link
Contributor

solsson commented Jan 9, 2018

Let's assume that the issue(s?) is caused by min.insync.replicas=2. #108 and #114 were both caused by this change.

I fail to reproduce the test issue. Kafka Manager works fine too. @allquantor Did you upgrade an existing cluster, that used to have min.insync.replicas=1. Can you see any broker log messages like those in #108? I don't know if console-consumer and console-producer can produce debug output, like -b broker,topic with kafkacat.

The test can be improved. min.insync.replicas can be overridden per producer, by setting acks explicitly. Also the topic creation job can be removed. It specifies --replication-factor=2. Not sure how this works with min.insync.replicas. But the kafkacat test has the same topic creation, and looks ok.

solsson added a commit that referenced this issue Jan 9, 2018
which means we get the default number of replicas from broker config

for #116
@solsson
Copy link
Contributor

solsson commented Jan 9, 2018

I noticed when doing e059690 that default min.insync.replicas is 1. Thus the tests were probably unaffected by the new default 2.

The change might still have affected Kafka internals, as in #114.

@albamoro
Copy link

albamoro commented Jan 9, 2018

I'm seeing the same issue as @StevenACoffman but with the stock kafka-console-consumer using it both inside and outside the cluster: the consumer is stuck in a loop marking a coordinator resolved as id = max(int32) as dead.

@solsson
Copy link
Contributor

solsson commented Jan 9, 2018

@albamoro Any clues in broker logs?

@albamoro
Copy link

albamoro commented Jan 9, 2018

None that I could see on the server side, but then I'm fairly new to them - I've upped the logging and been tailing the kafka-request.log to not much avail so far.

Forgot to mention an important detail: the console works when using the old consumer, i.e., pointing to --zookeeper rather than --bootstrap-server. Clients and server versions are aligned as they are based on the same image.

Client-wise this is the block that keeps on repeating - note the node's id is lost between lines 2 & 3. I couldn't tell how relevant that is.

2018-01-09 14:20:32 DEBUG AbstractCoordinator:183 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Sending GroupCoordinator request to broker 192.168.1.21:32402 (id: 2 rack: null)
2018-01-09 14:20:32 DEBUG AbstractCoordinator:183 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Received GroupCoordinator response ClientResponse(receivedTimeMs=1515507632283, latencyMs=1, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-1, correlationId=251), responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=192.168.1.21:32400 (id: 0 rack: null)))
2018-01-09 14:20:32 INFO  AbstractCoordinator:341 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Discovered coordinator 192.168.1.21:32400 (id: 2147483647 rack: null)
2018-01-09 14:20:32 DEBUG NetworkClient:183 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Initiating connection to node 192.168.1.21:32400 (id: 2147483647 rack: null)
2018-01-09 14:20:32 DEBUG AbstractCoordinator:177 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Disabling heartbeat thread
2018-01-09 14:20:32 INFO  AbstractCoordinator:336 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] (Re-)joining group
2018-01-09 14:20:32 DEBUG AbstractCoordinator:189 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Sending JoinGroup ((type: JoinGroupRequest, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0, sessionTimeout=10000, rebalanceTimeout=300000, memberId=consumer-1-5a07a421-70d7-4ae0-9be4-cbf0d2babfa1, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@21918021)) to coordinator 192.168.1.21:32400 (id: 2147483647 rack: null)
2018-01-09 14:20:32 DEBUG Selector:195 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2147483647
2018-01-09 14:20:32 DEBUG NetworkClient:183 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Completed connection to node 2147483647. Fetching API versions.
2018-01-09 14:20:32 DEBUG NetworkClient:183 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Initiating API versions fetch from node 2147483647.
2018-01-09 14:20:32 DEBUG NetworkClient:189 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Recorded API versions for node 2147483647: (Produce(0): 0 to 5 [usable: 5], Fetch(1): 0 to 6 [usable: 6], ListOffsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 5 [usable: 5], LeaderAndIsr(4): 0 to 1 [usable: 1], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 4 [usable: 4], ControlledShutdown(7): 0 to 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0], AlterReplicaLogDirs(34): 0 [usable: 0], DescribeLogDirs(35): 0 [usable: 0], SaslAuthenticate(36): 0 [usable: 0], CreatePartitions(37): 0 [usable: 0])
2018-01-09 14:20:32 DEBUG AbstractCoordinator:183 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Received successful JoinGroup response: org.apache.kafka.common.requests.JoinGroupResponse@35b15e13
2018-01-09 14:20:32 DEBUG ConsumerCoordinator:189 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Performing assignment using strategy range with subscriptions {consumer-1-5a07a421-70d7-4ae0-9be4-cbf0d2babfa1=Subscription(topics=[clickstream_raw])}
2018-01-09 14:20:32 DEBUG ConsumerCoordinator:183 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Finished assignment for group: {consumer-1-5a07a421-70d7-4ae0-9be4-cbf0d2babfa1=Assignment(partitions=[clickstream_raw-0])}
2018-01-09 14:20:32 DEBUG AbstractCoordinator:189 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Sending leader SyncGroup to coordinator 192.168.1.21:32400 (id: 2147483647 rack: null): (type=SyncGroupRequest, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0, generationId=63, memberId=consumer-1-5a07a421-70d7-4ae0-9be4-cbf0d2babfa1, groupAssignment=consumer-1-5a07a421-70d7-4ae0-9be4-cbf0d2babfa1)
2018-01-09 14:20:32 DEBUG AbstractCoordinator:183 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] SyncGroup failed:
2018-01-09 14:20:32 INFO  AbstractCoordinator:341 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Marking the coordinator 192.168.1.21:32400 (id: 2147483647 rack: null) dead
2018-01-09 14:20:32 DEBUG NetworkClient:189 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Manually disconnected from 2147483647. Removed requests: .
2018-01-09 14:20:32 DEBUG AbstractCoordinator:183 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-bc5d5601-5f1a-4b96-9193-7cb9e6c553fc--751033765-driver-0] Sending GroupCoordinator request to broker 192.168.1.21:32402 (id: 2 rack: null)

@solsson
Copy link
Contributor

solsson commented Jan 10, 2018

From the port numbers it looks like you get the "outside" listener (#78). I'm quite curious how that can happen in the tests. An ordering issue was resolved in 4c202f4. Might #99 (comment) be helpful to you too?

@albamoro
Copy link

albamoro commented Jan 10, 2018

Same happens using the inside listener. I had attached the external client's logs because those are the only ones I've been able to configure for DEBUG logging. You can see below the console's output when used within my Minikube, v0.24.1.

Re. commit 4c202f4, my repo is pointing at bbed23e so I presume the fix applies? I haven't checked #99 yet - I was planning on rolling back to early Nov as @StevenACoffman had done and work from there.

root@pzoo-1:/opt/kafka/bin# ./kafka-console-consumer.sh --bootstrap-server kafka-0.broker.kafka.svc.cluster.local:9092 --topic xxxx --from-beginning
[2018-01-10 10:39:26,845] INFO ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [kafka-0.broker.kafka.svc.cluster.local:9092]
        check.crcs = true
        client.id =
        connections.max.idle.ms = 540000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = console-consumer-77574
        heartbeat.interval.ms = 3000
        interceptor.classes = null
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 305000
        retry.backoff.ms = 100
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig)
[2018-01-10 10:39:26,893] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2018-01-10 10:39:26,893] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser)
[2018-01-10 10:39:26,973] INFO [Consumer clientId=consumer-1, groupId=console-consumer-77574] Discovered coordinator kafka-0.broker.kafka.svc.cluster.local:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-01-10 10:39:26,976] INFO [Consumer clientId=consumer-1, groupId=console-consumer-77574] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-01-10 10:39:26,976] INFO [Consumer clientId=consumer-1, groupId=console-consumer-77574] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-01-10 10:39:27,031] INFO [Consumer clientId=consumer-1, groupId=console-consumer-77574] Marking the coordinator kafka-0.broker.kafka.svc.cluster.local:9092 (id: 2147483647 rack: null) dead (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-01-10 10:39:27,144] INFO [Consumer clientId=consumer-1, groupId=console-consumer-77574] Discovered coordinator kafka-0.broker.kafka.svc.cluster.local:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-01-10 10:39:27,145] INFO [Consumer clientId=consumer-1, groupId=console-consumer-77574] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

@albamoro
Copy link

albamoro commented Jan 10, 2018

@solsson, just to confirm your comment above, #116 (comment), changing min.insync.replicas=1 whilst retaining default.replication.factor=3 clears these issues in my setup: produce-consume test fails & console consumer cannot use --bootstrap-server

@StevenACoffman
Copy link
Contributor

StevenACoffman commented Jan 10, 2018

@albamoro I am confused by your last comment:

clears these issues in my setup: produce-consume test fails & console consumer cannot use --bootstrap-server

Does changing those values cause the failing test issue or fix the issue?

@albamoro
Copy link

albamoro commented Jan 10, 2018

Yup, sorry, thought it might not be entirely clear :) Changing it makes the issues go. I haven't checked the implications of the "ack" setting, I'm currently using "all" from my producers.

@albamoro
Copy link

I have tested the min.insync.replicas setting against https://github.com/wurstmeister/kafka-docker on docker-compose and the results are the same, i.e. with min.insync.replicas=2 the console consumer fails when using --bootstrap-server rather than zookeeper. I don't understand how @solsson local environment could pass the tests with ISR=2. Otherwise I'd think we are either missing a trick here with regards to the Kafka config or maybe we are inheriting a bug on wurstmeister's dockerfile as I understand it is the basis of yolean's image?

@StevenACoffman
Copy link
Contributor

So I have three different kubernetes clusters, and in each I have done kubectl delete -f each file in reverse order from how they were created, removed any lingering detached persistent volumes, and reinstalled everything from latest master, and I have different results from these tests. In one they succeed, and in two others they failed. On one of the failing clusters, switching to the #121 branch:

  • producer
++ date --iso-8601=ns -u
+ echo '--- start produce-consume-z2vn2 2018-01-10T15:46:16,496865915+00:00 ---'
+ ./bin/kafka-topics.sh --zookeeper zookeeper.kafka:2181 --describe --topic test-produce-consume
+ ./bin/kafka-console-producer.sh --broker-list bootstrap.kafka:9092 --topic test-produce-consume --producer-property acks=-1
+ tail -f /shared/produce.tmp
[2018-01-10 16:15:36,241] WARN [Producer clientId=console-producer] Got error produce response with correlation id 185 on topic-partition test-produce-consume-0, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender)
[2018-01-10 16:16:57,574] ERROR Error when sending message to topic test-produce-consume with key: null, value: 62 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-produce-consume-0: 1501 ms has passed since batch creation plus linger time
[2018-01-10 16:17:07,615] ERROR Error when sending message to topic test-produce-consume with key: null, value: 62 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-produce-consume-0: 1540 ms has passed since batch creation plus linger time
  • consumer:
[2018-01-10 15:46:45,647] WARN [Consumer clientId=consumer-1, groupId=console-consumer-240] Error while fetching metadata with correlation id 2 : {test-produce-consume=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2018-01-10 16:14:43,887] WARN [Consumer clientId=consumer-1, groupId=console-consumer-240] Connection to node 2147483645 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

@StevenACoffman
Copy link
Contributor

StevenACoffman commented Jan 10, 2018

What is weird is that I can install the outside services, and kafkacat ok from that from my laptop.
kafkacat -C -b $BOOTSTRAP -t $TOPIC -o -10

@solsson
Copy link
Contributor

solsson commented Jan 10, 2018

@StevenACoffman With outside services, are you bootstrapping using the address of all three brokers?

I have different results from these tests

Could it be that the bootstrap service (in combination with acks) is the culprit? It does introduce the kind of randomness that would explain different behavior in identical setups. If so, does this only affect java clients? Have any of you spotted issues with the kafkacat test?

In 3.0 we used a single broker for bootstrap, to avoid errors for single-node setups. In 2.1 we used the full address to three brokers.

  • ./bin/kafka-topics.sh --zookeeper zookeeper.kafka:2181 --describe --topic test-produce-consume

I'm just curious, why didn't this print some topic metadata? Will it do on pod restart?

@albamoro
Copy link

In my case, the kafkacat test always passed, regardless of the min.insync.replicas setting.
I wouldn't want to unnecessarily add to the confusion, but if the bootstrap service was to blame, why do I get the same error using docker-compose and the wurstmeister/kafka image?

@StevenACoffman
Copy link
Contributor

StevenACoffman commented Jan 10, 2018

By the way, it's very handy to do this:

kubectl exec -it zoo-0 -n kafka -- /bin/bash

Then you can just run commands inside the cluster like:

./bin/kafka-topics.sh --zookeeper zookeeper.kafka:2181 --describe --topic test-produce-consume

When I diff the results, I get identical output except for one block. The passing test cluster:

Topic:test-produce-consume	PartitionCount:1	ReplicationFactor:2	Configs:
	Topic: test-produce-consume	Partition: 0	Leader: 2	Replicas: 2,1	Isr: 2,1

The failing test cluster:

Topic:test-produce-consume	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: test-produce-consume	Partition: 0	Leader: 1	Replicas: 1,0,2	Isr: 2,0,1

@StevenACoffman
Copy link
Contributor

StevenACoffman commented Jan 10, 2018

@solsson Using current master bbed23e I adjusted the min.insync.replicas to 1 (from 2), and set num.partitions to 5 instead of 1 (side note: we use 15 in our normal non-kubernetes infrastructure kafka servers). No other alterations were made.

With these new settings, in all three of my kubernetes clusters, I can repeatably, reliably teardown all the kubernetes resources i from this repository (including lingering persistent volumes) and re-apply them with no problems. With min.insync.replicas=2 it sometimes works and sometimes does not. It never appears to recover when it doesn't work. 6 minutes is enough for all tests to pass if they are ever going to in my AWS kubernetes clusters.

By the way, the yahoo kafka-manager problem was my own doing. I had somehow gotten into the habit of incorrectly entering the kafka bootstrap service bootstrap.kafka:9092 instead of the correct zookeeper service zookeeper.kafka:2181 when adding the cluster in the kafka-manager. This went about as well as expected.

I appreciate your careful attention, patience and help during all of this. #122 was very helpful.

@StevenACoffman
Copy link
Contributor

StevenACoffman commented Jan 10, 2018

I'm not sure if the sporadic successful runs were attributable to changes in kafka internals because I occasionally did this:

kubectl exec -it zoo-0 -n kafka -- /bin/bash
#-- inside kube shell next
cd /opt/kafka/bin
./kafka-topics.sh --create --if-not-exists --zookeeper zookeeper.kafka:2181 --replication-factor 2 --partitions 15  --topic k8s-firehose --config retention.ms=6912500000

The topic that was created was not the same as the test-produce-consume yet at least one of the clusters that I performed that on had tests passing and had the variant results for:

./bin/kafka-topics.sh --zookeeper zookeeper.kafka:2181 --describe --topic test-produce-consume

@StevenACoffman
Copy link
Contributor

StevenACoffman commented Jan 11, 2018

I went through and diff'ed our outside of kubernetes kafka server.properties with the one here, and I noticed two big differences. The first:

num.partitions=15 #you have 1
default.replication.factor=2 # you have 3
# we do not specify min.insync.replicas

The second is that we omit this section:

    ############################# Internal Topic Settings  #############################
    # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
    # For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1

That last section makes me wonder if that was my real problem all along.
I can alter offsets.topic.replication.factor to set it to 3, but I wonder if the other two should also be altered in production.

@StevenACoffman
Copy link
Contributor

StevenACoffman commented Jan 11, 2018

Setting min.insync.replicas=2 makes a test or two fail, and min.insync.replicas=1 makes the tests pass.

Changing offsets.topic.replication.factor=3 and omitting transaction.state.log.replication.factor and transaction.state.log.replication.factor seems like a good idea outside of minikube, but I don't notice a doesn't affect the tests passing or failing with either min.insync.replicas value.

@solsson
Copy link
Contributor

solsson commented Jan 11, 2018

@albamoro

I have tested the min.insync.replicas setting against https://github.com/wurstmeister/kafka-docker on docker-compose and the results are the same, i.e. with min.insync.replicas=2 the console consumer fails when using --bootstrap-server rather than zookeeper.

Sorry I didn't pay enough attention to this finding. Maybe docker-compose can provide a better test case for min.insync.replicas. Can you link to a gist or pastebin with your compose file?

wurstmeister's dockerfile as I understand it is the basis of yolean's image?

In fact I think Confluent's image looks inspired by wurstmeister's. I started out with Confluent's image, but with Kubernetes we have the ConfigMap feature so there's less need for using environment variables with services that use an actual config file. I wanted an image that was as close as possible to downloading and installing from https://kafka.apache.org/. There's some background on this choice in #46.

This thread is very important. I consider reverting the change to min.insync.replicas. But if this repo is opinionated about anything, it's that Kafka can be used in small scale AND with business critical data. Two acks from three replicas sounds to me like an essential part of that, and I'd really like it to be the default for producers that don't explicitly configure acks.

I will try to find more time to research this myself. Thanks for your efforts @albamoro and @StevenACoffman.

@solsson solsson changed the title Test failed? Test failures for console producer/consumer with min.insync.replicas=2 Jan 11, 2018
@albamoro
Copy link

@StevenACoffman

I'm not sure if the sporadic successful runs were attributable to changes in kafka internals because I occasionally did this:

kubectl exec -it zoo-0 -n kafka -- /bin/bash
#-- inside kube shell next
cd /opt/kafka/bin
./kafka-topics.sh --create --if-not-exists --zookeeper zookeeper.kafka:2181 --replication-factor 2 --partitions 15  --topic k8s-firehose --config retention.ms=6912500000

This is interesting & could have some bearing on the question: my tests fail when isr=2 and I use the new, bootstrap-server based consumer. My topics are autocreated on producer request, so maybe creating them using --zookeeper does something differently?

@solsson I've left the docker-compose.yml here: https://gist.github.com/albamoro/a56ed9aff40a10c2580d678a55b2b5d9

You'll need to create the backbone network beforehand:

docker network create backbone

I start it using:

docker-compose up -d --scale kafka=3

I very much appreciate your efforts too, thank you all, including @allquantor for raising it.

@StevenACoffman
Copy link
Contributor

@solsson I appreciate the goal of making something that works for developers and works in production.

We've used Kafka in production for 5 years at Ithaka. In production in just November, our 5 node kafka cluster had a topic that handled 30,419,294 messages per day, and another that was was about half that. Lots of other busy topics too. We don't specify min.insync.replicas (so it defaults to 1) and it hasn't been a problem so far as we know. We do set num.partitions=15 and default.replication.factor=2.

I'm not sure min.insync.replicas=2 is necessary for production level durability.

With that said, it sounds very nice in theory, and I'm very interested in understanding how you don't get the same results I do. If there's something else that needs to be turned on to make it work, then I'm all for it.

@solsson
Copy link
Contributor

solsson commented Jan 15, 2018

I've contemplated this while re-reading relevant parts of the kafka book... with acks=1 the leader doesn't know if any replica is up-to-date or not. This has worked well for us too, but so has min.insync.replicas=2 in QA for the last month or so.

Exerpts from Neha Narkhede, Gwen Shapira, and Todd Palino. ”Kafka: The Definitive Guide”:

"Only in-sync replicas are eligible to be elected as partition leaders in case the existing leader fails."

"Produced messages are considered “committed” when they were written to the partition on all its in-sync replicas (but not necessarily flushed to disk). Producers can choose to receive acknowledgments of sent messages when the message was fully committed, when it was written to the leader, or when it was sent over the network.
Messages that are committed will not be lost as long as at least one replica remains alive.
Consumers can only read messages that are committed."

"As we’ve seen, there are cases where even though we configured a topic to have three replicas, we may be left with a single in-sync replica. If this replica becomes unavailable, we may have to choose between availability and consistency. This is never an easy choice. Note that part of the problem is that, per Kafka reliability guarantees, data is considered committed when it is written to all in-sync replicas, even when all means just one replica and the data could be lost if that replica is unavailable."

In our case, as we grow with Kafka, I think it is important to require at least 1 successful leader->follower replication so that we catch cluster problems sooner rather than later. With Kubernetes loss of a node should be a non-event. It probably can be with commit="written to the leader" thanks to graceful shutdown, but I'd like higher consistency guarantees.

Thus we'll keep running with 2 and I will report any findings here or in referencing issues. Clients can always opt for acks=1 if/when we need workarounds. I think the test issues indicate a configuration problem, but I have been unable to track it down. It might be affected by auto.create.topics too. In order to investigate further, we've decided that monitoring is first priority. I will try to resume from #49 #93 #96 #112 and quite possibly investigate https://github.com/linkedin/Burrow. This'll have to take some calendar time.

With that said, it sounds very nice in theory, and I'm very interested in understanding how you don't get the same results I do. If there's something else that needs to be turned on to make it work, then I'm all for it.

@StevenACoffman Good summary of the path we're taking at Yolean :)

@solsson
Copy link
Contributor

solsson commented Jan 19, 2018

With #125 and #128 we'll be able to look for consumer lag and under-replicated partitions respectively, which I hope will help the investigation in this issue. With prometheus the label to filter on is name="UnderReplicatedPartitions".

@solsson
Copy link
Contributor

solsson commented Jan 19, 2018

Actually there's been a readiness test for checking for under-replicated partitions since #95, that I had forgotten about :) Prometheus + #128 lets you alert on sum({name="UnderReplicatedPartitions"}) > 0 and the replicated-partitions test can be used to list those.

I still haven't investigated if there's a difference between auto-created topics and those created through the CLI.

@lenadroid
Copy link
Contributor

lenadroid commented Jan 19, 2018

@solsson Hi! I'm getting:

kubectl --namespace=test-kafka logs produce-consume-xqhbk producer
++ date --iso-8601=ns -u
+ echo '--- start produce-consume-xqhbk 2018-01-19T23:27:05,831153320+00:00 ---'
+ tail -f /shared/produce.tmp
+ ./bin/kafka-console-producer.sh --broker-list bootstrap.kafka:9092 --topic test-produce-consume
[2018-01-19 23:27:07,325] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {test-produce-consume=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

With:

kubectl --namespace=test-kafka get pods
NAME                    READY     STATUS    RESTARTS   AGE
kafkacat-jtv2f          3/3       Running   0          12m
produce-consume-xqhbk   2/3       Running   0          12m

Can it be related to this issue?

Also seeing errors like this in Zookeeper:

[2018-01-19 23:08:41,004] INFO Defaulting to majority quorums (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2018-01-19 23:08:41,008] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2018-01-19 23:08:41,008] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2018-01-19 23:08:41,008] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2018-01-19 23:08:41,021] INFO Starting quorum peer (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2018-01-19 23:08:41,041] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.quorum.QuorumPeer)
[2018-01-19 23:08:41,041] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.quorum.QuorumPeer)
[2018-01-19 23:08:41,041] INFO QuorumPeer communication is not secured! (org.apache.zookeeper.server.quorum.QuorumPeer)
[2018-01-19 23:08:41,041] INFO quorum.cnxn.threads.size set to 20 (org.apache.zookeeper.server.quorum.QuorumPeer)
[2018-01-19 23:08:41,044] INFO currentEpoch not found! Creating with a reasonable default of 0. This should only happen when you are upgrading your installation (org.apache.zookeeper.server.quorum.QuorumPeer)
[2018-01-19 23:08:41,076] INFO acceptedEpoch not found! Creating with a reasonable default of 0. This should only happen when you are upgrading your installation (org.apache.zookeeper.server.quorum.QuorumPeer)
[2018-01-19 23:08:41,087] INFO My election bind port: /0.0.0.0:3888 (org.apache.zookeeper.server.quorum.QuorumCnxManager)
[2018-01-19 23:08:41,095] INFO LOOKING (org.apache.zookeeper.server.quorum.QuorumPeer)
[2018-01-19 23:08:41,095] INFO New election. My id =  5, proposed zxid=0x0 (org.apache.zookeeper.server.quorum.FastLeaderElection)
[2018-01-19 23:08:41,098] WARN Cannot open channel to 1 at election address pzoo-0.pzoo:3888 (org.apache.zookeeper.server.quorum.QuorumCnxManager)
java.net.UnknownHostException: pzoo-0.pzoo
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:589)
	at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:562)
	at org.apache.zookeeper.server.quorum.QuorumCnxManager.toSend(QuorumCnxManager.java:538)
	at org.apache.zookeeper.server.quorum.FastLeaderElection$Messenger$WorkerSender.process(FastLeaderElection.java:452)
	at org.apache.zookeeper.server.quorum.FastLeaderElection$Messenger$WorkerSender.run(FastLeaderElection.java:433)
	at java.lang.Thread.run(Thread.java:748)

@solsson
Copy link
Contributor

solsson commented Jan 22, 2018

I consider reverting the change to min.insync.replicas

I don't think I consider that anymore :) This is slightly dated, but I found it useful for our persistent data use case: https://www.slideshare.net/gwenshap/kafka-reliability-when-it-absolutely-positively-has-to-be-there

In addition, when working with #134, I learned that clients can't specify >1 acks. They can only chose to adhere to "all". It's an argument for min.insync.replicas > 1, that clients (or topics) can always specify 0 or 1 to opt out.

Also I've tried to double check that min.insync.replicas=2 actually mean leader + 1 follower, not 2 followers. It's seemingly implicit in kafka docs, but here they write "The leader is considered one of the in-sync replicas.".

@lenadroid :

[2018-01-19 23:27:07,325] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {test-produce-consume=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

I've seen this too, frequently since #107. Might be normal with auto create topics. But, there's actually a timing issue with test start (which I realized today @albamoro @allquantor @StevenACoffman could be a cause of the original issue). You never know which container comes first to creating the topic, the consumer (which oddly is allowed to create topics) or the producer or the topic creation job. If it's the job you won't get any errors in producer or consumer, but on the other hand you get replication factor =2 which is different from the default introduced in #107. I find kafka-manager (#83) really useful for looking at topic config.

@lenadroid The Zookeeper issue must be something else. Please create a separate issue.

@solsson
Copy link
Contributor

solsson commented Feb 3, 2018

I think this issue will conclude the 3.1 cycle, dubbed "The paintstaking path to min.insync.replicas=2". Do you think #140 has fixed the issue(s) here?

@solsson
Copy link
Contributor

solsson commented Feb 8, 2018

I released 3.1.0 now. Will leave this issue open, until I get positive reports or until I find the time to go through current open issues and close inactive ones.

@StevenACoffman
Copy link
Contributor

I've only had my three non-toy clusters running with this for 2 days, but so far it's been very solid, even with min.insync.replicas=2. This is very close to our out of kubernetes cluster settings now.

@solsson
Copy link
Contributor

solsson commented Feb 9, 2018

Sounds good. Made all the difference for me to learn about these additional replication factor props. I will close this then.

@solsson solsson closed this as completed Feb 9, 2018
@StevenACoffman
Copy link
Contributor

StevenACoffman commented Feb 9, 2018

For reference, we use this configuration in our Kafka clusters outside of kubernetes:
server.properties.txt

I have summarized the differences below. We have upgraded our clusters many times, so please don't assume these are still current best practices, even if they've proven fairly reliable for us. Aside from retention policies, I find some very intriguing differences and I wonder if you have any opinions on them @solsson

setting Ithaka Kafka Yolean Default
num.network.threads 2 3 3
num.io.threads 2 8 8
socket.send.buffer.bytes 1048576 102400 102400
socket.receive.buffer.bytes 1048576 102400 102400
socket.request.max.bytes 104857600 102400 104857600
num.partitions 15 1
default.replication.factor 2 1
offsets.topic.replication.factor 3 3
log.flush.interval.messages 10000 9223372036854775807
log.flush.interval.ms 1000 9223372036854775807
log.retention.hours 4 -1 168
log.retention.bytes 104857600 -1
log.segment.bytes 26214400 1073741824 1073741824
log.cleanup.interval.mins* 1 5
log.retention.check.interval.ms 60000 300000 300000
auto.leader.rebalance.enable true true
group.initial.rebalance.delay.ms 0 3000
leader.imbalance.per.broker.percentage 10 10
leader.imbalance.check.interval.seconds 3600 300
zookeeper.connection.timeout.ms 1000000 6000 6000
zookeeper.session.timeout.ms 30000 6000
kafka.metrics.polling.interval.secs 60 ?

* log.cleanup.interval.mins has been renamed log.retention.check.interval.ms

For very important topics with settings that differ from the default, we have a few external programs that are constantly ensuring those topics exist and have the proper retention periods (e.g. 1 week instead of 4 hours).

We have considered flipping this and defaulting to retaining for 1 week and changing the topic settings for all topics not on the whitelist.

@solsson
Copy link
Contributor

solsson commented Feb 9, 2018

Wow, that's some differences for sure :) We haven't done any performance tuning, and I think we're on recent defaults.

With hindsight, and #140 fresh in memory, we shouldn't have started from the sample server.properties out of the kafka distribution. Should have left server.properties completely empty instead, except for the things we care about: retention = -1 as default, so topics are "persistent" by default as we keep our primary records of domain data in Kafka.

For example the essential auto.leader.rebalance.enable is true in this repo's setup too, but thanks to broker defaults.

@StevenACoffman
Copy link
Contributor

StevenACoffman commented Feb 9, 2018

I updated the table to include a column for defaults. Now that 3.1 is released, would removing default values (or commenting them out) be warranted here? That would allow documentation to highlight the reason for any divergence from defaults.

Our situation is that we initially set all values explicitly, and painstakingly performance tuned when issues arose. However, as we upgraded multiple times and the recommended defaults changed, we didn't retain the history of which values were intentional differences for our usage and which were due to outdated adherence to "best practices" that were contextual on those older versions.

Part of the appeal of this project is it affords us the ability to cheaply experiment and revisit these settings.

@solsson
Copy link
Contributor

solsson commented Feb 9, 2018

I created #148 to try to pursue that idea. It's not too late to make amends :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants