Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Heap OOM issue when trying to sing a Debezium snapshot to GCS #259

Open
mkherlakian opened this issue Aug 2, 2022 · 2 comments
Open

Heap OOM issue when trying to sing a Debezium snapshot to GCS #259

mkherlakian opened this issue Aug 2, 2022 · 2 comments
Labels

Comments

@mkherlakian
Copy link

I'm running into an issue where the connector's JVM exists with an OOM error when sinking a topic freshly snapshotted through a PG database through Debezium. The setup:

  • 2 topics, topic A with 500k messages, topic B with 200
  • Avro format for key and value
  • Source connector produces messages with no issues, messages make it in Kafka
  • Topic A message size is similar to Topic B - Total size of topic A is 214kb, total size of topic B is 180Mb
  • template file config is "file.name.template": "{{topic}}/{{partition}}-{{start_offset}}-{{timestamp:unit=yyyy}}{{timestamp:unit=MM}}{{timestamp:unit=dd}}-{{timestamp:unit=HH}}.parquet.gz"
  • output format is parquet

This is all running on Aiven.

Topic A successfully sinks into GCS. The parquet file gets uploaded and all the data that we expect is there. Topic B consistently runs OOM.

We've tried a variety of values for file.max.records ranging from 50 to 1000, and for offset.flush.interval.ms, lowest being 50ms, but we still experience the OOMs

Part of the issue we believe is coming from the fact that since this starts with a PG snapshot, the timestamps are all within an hour of each other for the 1M records already in the topic. Therefore the connector's grouping logic would consider the entire topic's content to be part of 1 group - and if the GCS connector behaves the same as the S3 one, we thought this could be an indication - https://help.aiven.io/en/articles/4775651-kafka-outofmemoryerror-exceptions. However, we would've expected the file.max.records to compensate for this.

Also while ugrading plans is an option, we'd like to understand what knobs to turn to control memory utilization. Full cleaned up config attached:

{
  "name": "gcs",
  "connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
  "tasks.max": "1",

  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "https://<karapace-service>.aivencloud.com:10034",
  "key.converter.basic.auth.credentials.source": "USER_INFO",
  "key.converter.schema.registry.basic.auth.user.info": "<user:pass>",

  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "https://<karapace>.aivencloud.com:10034",
  "value.converter.basic.auth.credentials.source": "USER_INFO",
  "value.converter.schema.registry.basic.auth.user.info": "<user:pass>",

  "topics": "pg.public.A,pg.public.B",

  "gcs.credentials.json": "<GCP_CREDENTIALS>",
  "gcs.bucket.name": "data-lake",
  "file.name.template": "{{topic}}/{{partition}}-{{start_offset}}-{{timestamp:unit=yyyy}}{{timestamp:unit=MM}}{{timestamp:unit=dd}}-{{timestamp:unit=HH}}.parquet.gz",
  
  "file.compression.type": "gzip",
  "file.max.records": "200",
  "format.output.type": "parquet",
  "format.output.fields": "key,offset,timestamp,value",

  "offset.flush.interval.ms": 50, //tried different values here, none seem to have an effect
}

Any insight into what might be happening?

@mkokho
Copy link

mkokho commented Aug 3, 2022

thank you, sounds like a tricky problem to reproduce.
could you also share configuration of the kafka cluster where gcs connector is running?

@mkherlakian
Copy link
Author

@mkokho that might be a little trickier because it's fully managed by Aiven - Here are all the details I have access to:

3 node cluster, 1 CPU per cluster, 600Gb storage - from the logs, it looks like the connectors start with 768Mb heap. I believe that other than us increasing the max message size (we do have some rows that have blobs), everything else is the default config.

We did also test with a dataset of 1M rows, with no blobs where the size per message is predictable and ended up having the same issue...

@ahmedsobeh ahmedsobeh transferred this issue from Aiven-Open/gcs-connector-for-apache-kafka Aug 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants