Skip to content

Commit

Permalink
Merge pull request #1053 from ayeshLK/kafkahub-mtls-dev2
Browse files Browse the repository at this point in the history
Add support for retrying for message delivery errors
  • Loading branch information
ayeshLK authored Nov 20, 2024
2 parents d0745db + d4dc22d commit 7567770
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 1 deletion.
2 changes: 2 additions & 0 deletions examples/kafka-hub/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ services:
KEYSTORE_PASSWORD: "password"
# Maximum number of records returned in a single call to consumer-poll
KAFKA_CONSUMER_MAX_POLL_RECORDS: 50
# The HTTP status codes for which the client should retry
RETRYABLE_STATUS_CODES: "500,502,503"
volumes:
# Kafka client truststore file
- ./_resources/secrets/kafka-client/kafka-client.trustStore.jks:/home/ballerina/resources/brokercerts/client-truststore.jks
Expand Down
3 changes: 3 additions & 0 deletions examples/kafka-hub/hub/Config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ MESSAGE_DELIVERY_COUNT = 3
# The message delivery timeout
MESSAGE_DELIVERY_TIMEOUT = 10.0

# The HTTP status codes for which the client should retry
MESSAGE_DELIVERY_RETRYABLE_STATUS_CODES = [500, 502, 503]

# The Oauth2 authorization related configurations
[kafkaHub.config.OAUTH2_CONFIG]
issuer = "https://localhost:9443/oauth2/token"
Expand Down
13 changes: 13 additions & 0 deletions examples/kafka-hub/hub/modules/config/configurations.bal
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public configurable int MESSAGE_DELIVERY_COUNT = 3;
# The message delivery timeout
public configurable decimal MESSAGE_DELIVERY_TIMEOUT = 10;

# The HTTP status codes for which the client should retry
public configurable int[] MESSAGE_DELIVERY_RETRYABLE_STATUS_CODES = [500, 502, 503];

public final readonly & int[] RETRYABLE_STATUS_CODES = check getRetryableStatusCodes(MESSAGE_DELIVERY_RETRYABLE_STATUS_CODES).cloneReadOnly();

# The Oauth2 authorization related configurations
public configurable types:OAuth2Config OAUTH2_CONFIG = ?;

Expand All @@ -76,3 +81,11 @@ public final string WEBSUB_EVENTS_CONSUMER_GROUP = os:getEnv("WEBSUB_EVENTS_CONS
isolated function constructSystemConsumerGroup() returns string {
return string `websub-events-receiver-${SERVER_IDENTIFIER}-${util:generateRandomString()}`;
}

isolated function getRetryableStatusCodes(int[] configuredCodes) returns int[]|error {
if os:getEnv("RETRYABLE_STATUS_CODES") is "" {
return configuredCodes;
}
string[] statusCodes = re `,`.split(os:getEnv("RETRYABLE_STATUS_CODES"));
return statusCodes.'map(i => check int:fromString(i.trim()));
}
3 changes: 2 additions & 1 deletion examples/kafka-hub/hub/websub_subscribers.bal
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ isolated function pollForNewUpdates(string subscriberId, websubhub:VerifiedSubsc
interval: config:MESSAGE_DELIVERY_RETRY_INTERVAL,
count: config:MESSAGE_DELIVERY_COUNT,
backOffFactor: 2.0,
maxWaitInterval: 20
maxWaitInterval: 20,
statusCodes: config:RETRYABLE_STATUS_CODES
},
timeout: config:MESSAGE_DELIVERY_TIMEOUT
});
Expand Down

0 comments on commit 7567770

Please sign in to comment.