Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka source and aws s3 sink not work #18658

Closed
waney316 opened this issue Sep 24, 2023 · 4 comments
Closed

kafka source and aws s3 sink not work #18658

waney316 opened this issue Sep 24, 2023 · 4 comments
Labels
type: bug A code related bug.

Comments

@waney316
Copy link

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Problem

My vector version is 0.32.1. When I use kafka as the source and AWS S3 as the sink, it doesn't seem to work because the number of kafka consumer groups is 0. I suspect that this is due to the use of buffer.type=disk in the sink。

    [sources.input-backup]
    type = "kafka"
    bootstrap_servers = "${KAFKA}"
    group_id = "group"
    topics = [ "^xxxx.*"]
    decoding.codec="json"
    auto_offset_reset = "latest"

    [sinks.bak]
    type = "aws_s3"
    healthcheck.enabled = false
    inputs = [ "input-backup" ]
    auth.access_key_id = "xxxxxxxxxx"
    auth.secret_access_key = "xxxxx"
    endpoint = "xxxxx"
    key_prefix = "{{ topic }}/%Y/%m/%d/"
    bucket = "xxx"
    region = "xxxxx"
    compression = "gzip"
    encoding.codec="json"
    framing.method   = "newline_delimited"    
    filename_time_format = "%Y%m%d%H%M%S%3f_archive"

    batch.max_bytes = 2073741824
    batch.timeout_secs = 3600
    buffer.type = "disk"
    buffer.max_size = 268435488 
    buffer.when_full  = "block"

Configuration

No response

Version

0.32.1

Debug Output

No response

Example Data

No response

Additional Context

No response

References

No response

@waney316 waney316 added the type: bug A code related bug. label Sep 24, 2023
@jszwedko
Copy link
Member

Hi @waney316 !

Can you clarify what you mean by "doesn't work". Are you seeing events consumed from Kafka (you could use vector tap or the internal metrics to check this). Is it attempting to send events to AWS S3? Your batch configuration is such that you may need to wait 10 minutes to see that flushing.

It should be unrelated, but given you are using Kafka I do think adding a disk buffer to the aws_s3 sink only introduces downsides and no upsides given Kafka is already providing durability.

Also, could you also add the debug logs here?

@waney316
Copy link
Author

waney316 commented Nov 30, 2023

Hi @waney316 !

Can you clarify what you mean by "doesn't work". Are you seeing events consumed from Kafka (you could use vector tap or the internal metrics to check this). Is it attempting to send events to AWS S3? Your batch configuration is such that you may need to wait 10 minutes to see that flushing.

It should be unrelated, but given you are using Kafka I do think adding a disk buffer to the aws_s3 sink only introduces downsides and no upsides given Kafka is already providing durability.

Also, could you also add the debug logs here?

tks @jszwedko ,It seems like you're encountering an issue where, under buffer.type = "disk", the buffer size remains unchanged. Additionally, Vector's top command shows that there are no incoming events from the Kafka source (source=input-kafka-backup), and the sink (Sinks=bak-to-cos) is not updating. I also noticed an error in Vector's logs: consumption error: PollExceeded (Local: Maximum application poll interval (max.poll.interval.ms) exceeded. What is the issue here? The following is a snippet of my configuration.

[sources.input-kafka-backup]
type = "kafka"
bootstrap_servers = "${KAFKA}"
group_id = "xxxxxx"
topics = [ "^xxxxxx.*" ]
decoding.codec="json"
auto_offset_reset = "latest"

[sinks.bak-to-cos]
type = "aws_s3"
healthcheck.enabled = false
inputs = [ "input-kafka-backup" ]
auth.access_key_id = ""
auth.secret_access_key = ""
endpoint = "https://xxxxx.com"
key_prefix = "{{ topic }}/%Y/%m/%d/"
bucket = "xxxxxx"
region = "xxxxx"
compression = "gzip"
encoding.codec="json"
framing.method   = "newline_delimited"    
filename_time_format = "%Y%m%d%H%M%S%3f_archive"

batch.max_bytes = 209715200 
batch.timeout_secs = 900
buffer.type = "disk"
buffer.max_size = 734003200 
buffer.when_full = "block"
request.concurrency= "adaptive"

image
image
image

@jszwedko
Copy link
Member

Hi @waney316 ,

It seems like Vector is failing to fetch events from Kafka. I would check:

  • Connectivity between Vector and Kafka (e.g. can you run kafkacat from the same host or container)
  • If connectivity is verified, I would check the Kafka broker logs for additional clues

@jszwedko
Copy link
Member

Closing due to lack of response.

@jszwedko jszwedko closed this as not planned Won't fix, can't repro, duplicate, stale Feb 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug A code related bug.
Projects
None yet
Development

No branches or pull requests

2 participants