Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka input: consumer group not behaving as expected when restricting the number of topic partitions #1806

Closed
elukey opened this issue Mar 17, 2023 · 11 comments
Labels
kafka Issues relating to kafka needs investigation It looks as though have all the information needed but investigation is required

Comments

@elukey
Copy link

elukey commented Mar 17, 2023

Hi folks!

At Wikimedia we are happy users of Benthos for a stream processing pipeline. We pull what we call "Webrequests" (basically the JSON dump of every HTTP request hitting our front end caches) from Kafka, sample and enrich it with some GeoIP and additional data, to then re-insert it to Kafka again (so other tools can consume this stream).

Since the total volume of events to pull from Kafka is huge, we decided to test the feature of kafka input to select the number of topic partitions to pull from. This seemed to work nicely up to some days ago, when a consumer group rebalance was needed (a host was decommissioned and a new one was added instead), leading to https://phabricator.wikimedia.org/T331801. We can't really explain what the issue was, but it seemed that Benthos started pulling from less partitions than expected, leading to inconsistent results later on in our pipeline (we have some tools that visualize the data that we save in Kafka, and the traffic flows were totally off).

We tried to change the consumer group name, reset its state in Kafka, etc.. Nothing fixed the issue except changing the input kafka config to pull from all the topic partitions, instead of selecting a range of them. We have no idea why it was working before, maybe it was by chance/luck and the rebalance made the issue more clear?

This is a simplified version of the config that we use:

http:
  enabled: true
  address: "0.0.0.0:${PORT}"
  debug_endpoints: true

input:
  label: "webrequest_sampled_in"
  kafka:
    target_version: 1.1.0
    addresses: [ "${KAFKA_BROKERS}" ]
    topics: [ "${KAFKA_TOPICS}" ]
    tls:
      enabled: true
      root_cas_file: /etc/ssl/certs/wmf-ca-certificates.crt
      skip_cert_verify: true # See https://phabricator.wikimedia.org/T291905
    consumer_group: 'benthos-webrequest-sampled-live-test-elukey'
    start_from_oldest: false
    batching:
      count: 2048
      period: 100ms

pipeline:
  threads: 5
  processors:
  - label: "drop"
    bloblang: |
      root = if this.ip != "-" && this.sequence != "-" && this.dt != "-" && this.sequence % env("SAMPLING").number() != 0 { deleted() }

  - label: "geoip"
    bloblang: |
      root = this
      root.as_number = this.ip.geoip_asn(path: "/usr/share/GeoIP/GeoIP2-ISP.mmdb").AutonomousSystemNumber
      root.isp = this.ip.geoip_isp(path: "/usr/share/GeoIP/GeoIP2-ISP.mmdb").ISP
      root.country_code = this.ip.geoip_country(path: "/usr/share/GeoIP/GeoIP2-Country.mmdb").Country.IsoCode
      root.continent = this.ip.geoip_country(path: "/usr/share/GeoIP/GeoIP2-Country.mmdb").Continent.Names.en

  - label: "meta"
    bloblang: |
      root = this
      root.webrequest_source = (
          meta("kafka_topic").split("_").index(1)
      ).catch("-")

  - label: "analytics"
    bloblang: |
      root = this
      root.is_pageview = "-"
      root.x_analytics_data = (
        (this.x_analytics.split(";").map_each(
          field -> {(field.split("=").index(0)): (field.split("=").index(1))}
        )).squash()
      ).catch({})

      root.is_from_public_cloud = root.x_analytics_data.public_cloud.or("0")
      root.requestctl = root.x_analytics_data.requestctl.or("0")
      root.is_debug = root.x_analytics_data.is_debug.or("0")
      root.client_port = root.x_analytics_data.client_port.or("-")

      root.x_analytics_data = deleted()
      root.x_analytics = deleted()

  - label: "tls_data"
    bloblang: |
      root = this
      root.tls_data = (
          (this.tls.split(";").map_each(
              field -> {
                 (field.split("=").index(0)): (field.split("=").index(1))
              })
          ).squash()
      ).catch({})
      root.tls_version = root.tls_data.get("vers").catch("-")
      root.tls_key_exchange = root.tls_data.get("keyx").catch("-")
      root.tls_auth = root.tls_data.get("auth").catch("-")
      root.tls_cipher = root.tls_data.get("ciph").catch("-")
      root.tls_data = deleted()
      root.tls = deleted()

output:
  label: ""
  stdout:
    codec: lines

metrics:
  prometheus:
    use_histogram_timing: true
    add_process_metrics: true
    add_go_metrics: true
  mapping: |
    root = "benthos_" + this


tests:
  - name: tls_data parsing
    target_processors: 'tls_data'
    environment: {}
    input_batch:
      - content: '{"tls": "vers=TLSv1.3;keyx=UNKNOWN;auth=ECDSA;ciph=AES-256-GCM-SHA384;prot=h2;sess=new"}'
    output_batches:
      -
        - json_equals:
             tls_version: TLSv1.3
             tls_auth: ECDSA
             tls_cipher: AES-256-GCM-SHA384
             tls_key_exchange: UNKNOWN

  - name: analytics parsing
    target_processors: 'analytics'
    environment: {}
    input_batch:
      - content: '{"x_analytics":"https=1;client_port=4181;nocookies=1;public_cloud=1"}'
    output_batches:
      -
        - json_equals:
             client_port: "4181"
             is_debug: "0"
             is_from_public_cloud: "1"
             is_pageview: "-"
             requestctl: "0"

And these are the env variables:

PORT=4151
KAFKA_BROKERS=kafka-jumbo1001.eqiad.wmnet:9093,kafka-jumbo1002.eqiad.wmnet:9093,kafka-jumbo1003.eqiad.wmnet:9093,kafka-jumbo1004.eqiad.wmnet:9093,kafka-jumbo1005.eqiad.wmnet:9093,kafka-jumbo1006.eqiad.wmnet:9093,kafka-jumbo1007.eqiad.wmnet:9093,kafka-jumbo1008.eqiad.wmnet:9093,kafka-jumbo1009.eqiad.wmnet:9093
export KAFKA_TOPICS=webrequest_text
export SAMPLING=128

If I use KAFKA_TOPICS=webrequest_text, I see the following in Kafka:

elukey@kafka-jumbo1001:~$ kafka consumer-groups --describe --group benthos-webrequest-sampled-live-test-elukey --state
[..]

COORDINATOR (ID)                        ASSIGNMENT-STRATEGY       STATE                #MEMBERS
kafka-jumbo1009.eqiad.wmnet:9092 (1009) range                     Stable               1

Otherwise, if I use KAFKA_TOPICS=webrequest_text:1-6, I see the following:

elukey@kafka-jumbo1001:~$ kafka consumer-groups --describe --group benthos-webrequest-sampled-live-test-elukey --state
[..]

COORDINATOR (ID)                        ASSIGNMENT-STRATEGY       STATE                #MEMBERS
kafka-jumbo1009.eqiad.wmnet:9092 (1009)                           Empty                0

The kafka version that we use is 1.1.0, so we are wondering if there is any gotcha that we still don't have about how to use a range of partitions instead of the full topic volume. Any help would be really appreciated!

@Jeffail
Copy link
Collaborator

Jeffail commented Mar 17, 2023

Hey @elukey, I've recently been looking into this a bit, the kafka version might be a clue as I've been struggling to reproduce the issue even though others have reported similar stuff (#1058, #1802).

There's more info here overall (thanks for the write up) so I'm going to use this as the conglomerate of "dodgy kafka consumer" issues.

Unfortunately a common theme of our Kafka woes is when running a combination of some niche situation that might not be a popular configuration alongside our baseline kafka input/output which uses the sarama client library under the hood. Sarama has been great for the most part but these odd issues seem to keep piling up and we're now at a point where due to broken compatibility promises we can't necessary rely on upgrades even if there were fixes in place (IBM/sarama#2358).

We therefore have two options, we can either push forward with diagnosing these issues within sarama (if that's where they are) and then either fork for fixes (or maybe they'll correct the breaking changes and we can upgrade). Or the alternative is to try and get the kafka_franz input to reach parity with the options we have in the kafka input. The new kafka_franz components use a client library (https://github.com/twmb/franz-go) that I have much more confidence in.

I'm leaning towards adding the bells and whistles to our kafka_franz input and then deprecating the old standard one.

@Jeffail Jeffail added the needs investigation It looks as though have all the information needed but investigation is required label Mar 17, 2023
@elukey
Copy link
Author

elukey commented Mar 18, 2023

@Jeffail makes sense! We checked kafka_franz in the past but we didn't find two important things:

  • Select topic partitions to pull from (instead of all of them).
  • Use batching when pulling from the Kafka topic (to have better throughput when dealing with a ton of events/second like in our case).
    Is there a way to make the above two things working in the current Benthos implementation? If so we could definitely test it and report back!

@mihaitodor mihaitodor added the kafka Issues relating to kafka label Mar 18, 2023
@twmb
Copy link
Contributor

twmb commented Mar 22, 2023

Select topic partitions to pull from

This can be done with ConsumePartitions

But also -- specifying partitions is unrelated to this issue, right?

batching

@elukey not sure what you mean w.r.t. batching -- I might be missing some Benthos context here. The franz-go client consumes through batches only (always -- all clients do). Edit: re-reading the above, this looks like the Benthos concept.

A few batching tuning knobs can be controlled with FetchMaxBytes, FetchMaxPartitionBytes, FetchMaxWait, FetchMinBytes, MaxConcurrentFetches. Also it's probably worth supporting the Rack option.

(this comment is about what can be added to Benthos for Benthos authors :D)

@elukey
Copy link
Author

elukey commented Mar 22, 2023

Hi!

Select topic partitions to pull from

This can be done with ConsumePartitions

But also -- specifying partitions is unrelated to this issue, right?

In theory no, without specifying partitions the kafka client behaves as expected, but when we apply a range we start seeing the issues described above.

batching

@elukey not sure what you mean w.r.t. batching -- I might be missing some Benthos context here. The franz-go client consumes through batches only (always -- all clients do). Edit: re-reading the above, this looks like the Benthos concept.

A few batching tuning knobs can be controlled with FetchMaxBytes, FetchMaxPartitionBytes, FetchMaxWait, FetchMinBytes, MaxConcurrentFetches. Also it's probably worth supporting the Rack option.

(this comment is about what can be added to Benthos for Benthos authors :D)

I didn't see it specified in the kafka_franz documentation, meanwhile there is a specific barching option in the kafka one, this is why I was asking :)

@Jeffail
Copy link
Collaborator

Jeffail commented Mar 22, 2023

Yeah this is a limitation with the features benthos exposes rather than the underlying franz-go library. @twmb we have everything we need I just need to add a bit of a fork in our plugin to use explicit partitions rather than balancing.

@elukey
Copy link
Author

elukey commented Apr 26, 2023

@Jeffail hi! Checking in again to see if there are news :)

@Jeffail
Copy link
Collaborator

Jeffail commented Apr 28, 2023

hey @elukey I've just added partition consumption to the kafka_franz input: 10e67f1, batching is next but in the meantime you might be able to get the performance you need by putting it within a broker input and using the batching mechanism that has, it won't be as good as it'll lump all partitions together.

@Jeffail
Copy link
Collaborator

Jeffail commented May 1, 2023

Batching added now: 463169c, for anyone finding this issue after experiencing similar problems I urge you to try out the kafka_franz input instead. My intention is to remove the beta flag soon and recommend it generally above the sarama based ones. Eventually the sarama connectors will be deprecated so I'm not sure it's worth trying to dig into this issue any further. Closing for now.

@Jeffail Jeffail closed this as completed May 1, 2023
wmfgerrit pushed a commit to wikimedia/operations-puppet that referenced this issue May 11, 2023
In redpanda-data/connect#1806 the upstream
devs added support for batching and select topic partitions to
kafka_franz, a new component that seems preferred from the kafka
one. I tested the new Benthos version 4.15.0 on stat1004 and
it worked nicely.

The idea is to replace the kafka component/client with kafka_franz,
see how it goes and then reduce the topic partitions (and adjust
sampling) later on.

Bug: T331801
Change-Id: Ib2c05e3dcd0632ff512ca382e269e8b6a720b591
@elukey
Copy link
Author

elukey commented May 12, 2023

@Jeffail thanks a lot! We tested the new kafka input/output and it works really well. One caveat - when we migrated from kafka to kafka_franz the consumer group name needed to be changed, since the following error was emitted:

INCONSISTENT_GROUP_PROTOCOL: The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list.

We expected something similar but maybe a reference in the docs could be good for people attempting the same migration.

We also discovered that limiting the topic partitions to consume from doesn't work with a consumer group setting in kafka_franz. We rely on the consumer group feature to have more than one Benthos instance across datacenters, getting automatic failover in case one instance goes down etc.. (and also a good automatic split of traffic among instances). What kind of consumer is used when a consumer group is not specified?

@twmb
Copy link
Contributor

twmb commented May 19, 2023

  • The franz-go client's default group balancer is cooperative-sticky, while I think Sarama's default is range / roundrobin (not exactly positive) -- it's incompatible. You'd need to configure the balancer to be the same if you want to seamlessly migrate.

  • With consumer groups, you can't control which client processes which partition. If that exists in the Sarama endpoint, I'm not sure how they're doing because well, it's just not possible without a custom balancer. If no group is specified, my guess is that benthos uses a direct consumer -- no consumer group, the client just consumes the topics or partitions it's given.

Does the Sarama (default kafka) endpoint have the ability to specify which exact partitions to consume in a group?

@elukey
Copy link
Author

elukey commented May 22, 2023

  • The franz-go client's default group balancer is cooperative-sticky, while I think Sarama's default is range / roundrobin (not exactly positive) -- it's incompatible. You'd need to configure the balancer to be the same if you want to seamlessly migrate.

Yep I was suggesting to add this gotcha to the documentation since it is not super easy to know.

  • With consumer groups, you can't control which client processes which partition. If that exists in the Sarama endpoint, I'm not sure how they're doing because well, it's just not possible without a custom balancer. If no group is specified, my guess is that benthos uses a direct consumer -- no consumer group, the client just consumes the topics or partitions it's given.

Does the Sarama (default kafka) endpoint have the ability to specify which exact partitions to consume in a group?

In theory yes, but we had a lot of problems when we tried, and then we moved to kafka_franz. Not a big deal if we can't, it would be nice to reduce the amount of data pulled from huge topics :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kafka Issues relating to kafka needs investigation It looks as though have all the information needed but investigation is required
Projects
None yet
Development

No branches or pull requests

4 participants