From fca9bcae8caa690bacf39da79955a1782f6b3635 Mon Sep 17 00:00:00 2001 From: mateusjunges Date: Wed, 16 Feb 2022 13:37:35 -0300 Subject: [PATCH 1/3] remove producer config property from `getConsumerOptions` method --- src/Config/Config.php | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Config/Config.php b/src/Config/Config.php index 16108033..551ee680 100644 --- a/src/Config/Config.php +++ b/src/Config/Config.php @@ -67,7 +67,6 @@ public function getConsumerOptions(): array 'metadata.broker.list' => $this->broker, 'auto.offset.reset' => config('kafka.offset_reset', 'latest'), 'enable.auto.commit' => config('kafka.auto_commit', true) === true ? 'true' : 'false', - 'compression.codec' => config('kafka.compression', 'snappy'), 'group.id' => $this->groupId, 'bootstrap.servers' => $this->broker, ]; From 154428b57aa7f95e899a938fd510f6a476cc62e4 Mon Sep 17 00:00:00 2001 From: mateusjunges Date: Wed, 16 Feb 2022 13:41:48 -0300 Subject: [PATCH 2/3] fix tests --- tests/Config/ConfigTest.php | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/Config/ConfigTest.php b/tests/Config/ConfigTest.php index fc9dfd1a..8e3838dc 100644 --- a/tests/Config/ConfigTest.php +++ b/tests/Config/ConfigTest.php @@ -25,7 +25,6 @@ public function testItReturnsDefaultKafkaConfiguration() $expectedOptions = [ 'auto.offset.reset' => 'latest', 'enable.auto.commit' => 'true', - 'compression.codec' => 'snappy', 'group.id' => 'group', 'bootstrap.servers' => 'broker', 'metadata.broker.list' => 'broker', From 9fe50d6f548aed88283aaef22a403f0527ce9204 Mon Sep 17 00:00:00 2001 From: mateusjunges Date: Wed, 16 Feb 2022 13:58:00 -0300 Subject: [PATCH 3/3] reject consumer options in producers and producer options in consumer --- src/Config/Config.php | 57 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 55 insertions(+), 2 deletions(-) diff --git a/src/Config/Config.php b/src/Config/Config.php index 551ee680..a057e629 100644 --- a/src/Config/Config.php +++ b/src/Config/Config.php @@ -9,6 +9,55 @@ class Config { const SASL_PLAINTEXT = 'SASL_PLAINTEXT'; const SASL_SSL = 'SASL_SSL'; + const PRODUCER_ONLY_CONFIG_OPTIONS = [ + 'transactional.id', + 'transaction.timeout.ms', + 'enable.idempotence', + 'enable.gapless.guarantee', + 'queue.buffering.max.messages', + 'queue.buffering.max.kbytes', + 'queue.buffering.max.ms', + 'linger.ms', + 'message.send.max.retries', + 'retries', + 'retry.backoff.ms', + 'queue.buffering.backpressure.threshold', + 'compression.codec', + 'compression.type', + 'batch.num.messages', + 'batch.size', + 'delivery.report.only.error', + 'dr_cb', + 'dr_msg_cb', + 'sticky.partitioning.linger.ms' + ]; + const CONSUMER_ONLY_CONFIG_OPTIONS = [ + 'partition.assignment.strategy', + 'session.timeout.ms', + 'heartbeat.interval.ms', + 'group.protocol.type', + 'coordinator.query.interval.ms', + 'max.poll.interval.ms', + 'enable.auto.commit', + 'auto.commit.interval.ms', + 'enable.auto.offset.store', + 'queued.min.messages', + 'queued.max.messages.kbytes', + 'fetch.wait.max.ms', + 'fetch.message.max.bytes', + 'max.partition.fetch.bytes', + 'fetch.max.bytes', + 'fetch.min.bytes', + 'fetch.error.backoff.ms', + 'offset.store.method', + 'isolation.level', + 'consume_cb', + 'rebalance_cb', + 'offset_commit_cb', + 'enable.partition.eof', + 'check.crcs', + 'allow.auto.create.topics' + ]; public function __construct( private string $broker, @@ -75,7 +124,9 @@ public function getConsumerOptions(): array $options['enable.auto.commit'] = $this->autoCommit === true ? 'true' : 'false'; } - return array_merge($options, $this->customOptions, $this->getSaslOptions()); + return collect(array_merge($options, $this->customOptions, $this->getSaslOptions())) + ->reject(fn ($option) => in_array($option, self::PRODUCER_ONLY_CONFIG_OPTIONS)) + ->toArray(); } #[Pure] @@ -87,7 +138,9 @@ public function getProducerOptions(): array 'metadata.broker.list' => $this->broker, ]; - return array_merge($config, $this->customOptions, $this->getSaslOptions()); + return collect(array_merge($config, $this->customOptions, $this->getSaslOptions())) + ->reject(fn ($option) => in_array($option, self::CONSUMER_ONLY_CONFIG_OPTIONS)) + ->toArray(); } #[Pure]