Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Key and partition selector #123

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

cluster28
Copy link

Add partition stamp for selecting the destination partition in Kafka.
Add key stamp for adding a key to kafka message

Those stamps can be added in a Messenger Middleware to choose the partition or add a key.

I didn't know the dependency versions to run the tests and i found that with this docker composer image the tests didn't throw any warning and are all ok

docker run --rm --interactive --tty --volume $PWD:/app composer:1.10.13 /bin/bash -c "composer install && vendor/bin/phpunit"

#53 (comment)

@cluster28
Copy link
Author

Ping @sroze

@cluster28
Copy link
Author

Ping @Nyholm

@@ -168,6 +170,16 @@ public function send(Envelope $envelope): Envelope
$producer->setTimeToLive($this->options['timeToLive']);
}

$partitionStamp = $envelope->last(PartitionStamp::class);
if (method_exists($interopMessage, 'setPartition') && !is_null($partitionStamp)) {
$interopMessage->setPartition($partitionStamp->getPartition());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What class do you expect here?
Ie, it is not Interop\Queue\Message, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Nyholm

I expect to have a Enqueue\RdKafka\RdKafkaMessage

I don't remember why I choose to check the method instead of checking the class...

Copy link
Author

@cluster28 cluster28 May 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Nyholm do you prefer that?

$partitionStamp = $envelope->last(PartitionStamp::class);
if ($interopMessage instanceof RdKafkaMessage && !is_null($partitionStamp)) {
    $interopMessage->setPartition($partitionStamp->getPartition());
}

Copy link
Author

@cluster28 cluster28 Jun 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants