Skip to content

Commit

Permalink
add: flushOptions that are configurable both on global (config) level…
Browse files Browse the repository at this point in the history
… and for individual producers [fixes mateusjunges#300]
  • Loading branch information
Alexander (SASh) Alexiev committed Aug 23, 2024
1 parent d15752b commit 41ee00e
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 2 deletions.
10 changes: 10 additions & 0 deletions config/kafka.php
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@
*/
'flush_retry_sleep_in_ms' => 100,

/*
* The number of retries that will be used when flushing the producer
*/
'flush_retries' => 10,

/**
* The flush timeout in milliseconds
*/
'flush_timeout_in_ms' => 1000,

/*
| The cache driver that will be used
*/
Expand Down
2 changes: 2 additions & 0 deletions src/Config/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public function __construct(
private readonly int $maxTime = 0,
private readonly array $partitionAssignment = [],
private readonly ?Closure $whenStopConsuming = null,
public readonly ?int $flushRetries = null,
public readonly ?int $flushTimeoutInMs = null,
) {
}

Expand Down
2 changes: 2 additions & 0 deletions src/Contracts/MessageProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public function withMessage(ProducerMessage $message): self;
/** Set Sasl configuration. */
public function withSasl(string $username, string $password, string $mechanisms, string $securityProtocol = 'SASL_PLAINTEXT'): self;

public function withFlushOptions(int $retries, int $timeoutInMs): self;

/** Specifies which serializer should be used. */
public function usingSerializer(MessageSerializer $serializer): self;

Expand Down
12 changes: 12 additions & 0 deletions src/Producers/Builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class Builder implements MessageProducer
private readonly string $broker;
private bool $isTransactionProducer = false;
private int $maxTransactionRetryAttempts = 5;
private ?int $flushRetries = null;
private ?int $flushTimeoutInMs = null;

public function __construct(
?string $broker = null,
Expand Down Expand Up @@ -158,6 +160,14 @@ public function withDebugDisabled(): self
return $this->withDebugEnabled(false);
}

public function withFlushOptions(int $retries, int $timeoutInMs): self
{
$this->flushRetries = $retries;
$this->flushTimeoutInMs = $timeoutInMs;

return $this;
}

/**
* Send the given message to Kakfa.
*
Expand Down Expand Up @@ -204,6 +214,8 @@ public function build(): Producer
sasl: $this->saslConfig,
customOptions: $this->options,
callbacks: $this->callbacks,
flushRetries: $this->flushRetries,
flushTimeoutInMs: $this->flushTimeoutInMs,
);

$producer = app(Producer::class, [
Expand Down
6 changes: 4 additions & 2 deletions src/Producers/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,12 @@ public function flush(): mixed
// after sending all messages with Producer::sendBatch
$flush = function () {
$sleepMilliseconds = config('kafka.flush_retry_sleep_in_ms', 100);
$retries = $this->config->flushRetries ?? config('kafka.flush_retries', 10);
$timeout = $this->config->flushTimeoutInMs ?? config('kafka.flush_timeout_in_ms', 1000);

try {
return retry(10, function () {
$result = $this->producer->flush(1000);
return retry($retries, function () use ($timeout) {
$result = $this->producer->flush($timeout);

if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
return true;
Expand Down
12 changes: 12 additions & 0 deletions src/Support/Testing/Fakes/ProducerBuilderFake.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class ProducerBuilderFake implements MessageProducer
private ?Sasl $saslConfig = null;
private string $topic = '';
private ?Closure $producerCallback = null;
private ?int $flushRetries = null;
private ?int $flushTimeoutInMs = null;

public function __construct(
private readonly ?string $broker = null,
Expand Down Expand Up @@ -145,6 +147,14 @@ public function withSasl(string $username, string $password, string $mechanisms,
return $this;
}

public function withFlushOptions(int $retries, int $timeoutInMs): self
{
$this->flushRetries = $retries;
$this->flushTimeoutInMs = $timeoutInMs;

return $this;
}

/** Specifies which serializer should be used. */
public function usingSerializer(MessageSerializer $serializer): MessageProducer
{
Expand Down Expand Up @@ -202,6 +212,8 @@ public function build(): ProducerFake
sasl: $this->saslConfig,
customOptions: $this->options,
callbacks: $this->callbacks,
flushRetries: $this->flushRetries,
flushTimeoutInMs: $this->flushTimeoutInMs,
);

return $this->makeProducer($conf);
Expand Down

0 comments on commit 41ee00e

Please sign in to comment.