Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

groupId is overriden by camel-k-integration using kafka consumer #2605

Closed
mmacphail opened this issue Aug 29, 2021 · 2 comments
Closed

groupId is overriden by camel-k-integration using kafka consumer #2605

mmacphail opened this issue Aug 29, 2021 · 2 comments

Comments

@mmacphail
Copy link
Contributor

Versions
kubernetes: docker for dekstop 1.19
camelk client : Camel K Client 1.5.0

Given the following kamel installation:

kamel install --registry $REGISTRY --organization mmacphail --registry-secret regcred --maven-repository https://packages.confluent.io/maven/@id=confluent --maven-repository https://jitpack.io/@id=jitpack

And the following helloworld.groovy file:

def bootstrap = 'my-cluster-kafka-bootstrap.kafka:9092'
def topic = 'sensor'
def groupId = 'testcamelk'
def keyDeserializer = 'org.apache.kafka.common.serialization.StringDeserializer'
def valueDeserializer = 'io.confluent.kafka.serializers.KafkaAvroDeserializer'
def schemaRegistryURL = 'http://schema-registry-service.kafka:8081'

def kafka = "kafka:${topic}?brokers=${bootstrap}&groupId=${groupId}&autoOffsetReset=earliest&keyDeserializer=${keyDeserializer}&valueDeserializer=${valueDeserializer}&schemaRegistryURL=${schemaRegistryURL}&specificAvroReader=true"

from(kafka)
  .to('log:info?showBody=true&showHeaders=true')

When running helloworld, groupId is always overriden by the string camel-k-integration at runtime.

See logs:

	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [my-cluster-kafka-bootstrap.kafka:9092]
	check.crcs = true
	client.dns.lookup = use_all_dns_ips
	client.id = consumer-camel-k-integration-2
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = true
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = camel-k-integration
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	internal.throw.on.fetch.stable.offset.unsupported = false
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 40000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer

I believe the provided groupId should be used.

@oscerd
Copy link
Contributor

oscerd commented Aug 30, 2021

This is caused by: apache/camel-quarkus#2901

It should be fixed once we upgrade.

@mmacphail
Copy link
Contributor Author

Thank you, waiting for fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants