Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use application name as default Kafka consumer group.id #15678

Merged
merged 1 commit into from
Mar 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,12 @@ Type: _string_ | true |

Type: _int_ | false | `1`

| *group.id* | A unique string that identifies the consumer group the application belongs to. If not set, a unique, generated id is used
| *group.id* | A unique string that identifies the consumer group the application belongs to.

If not set, defaults to the application name as set by the `quarkus.application.name` configuration property.

If that is not set either, a unique, generated id is used.
Ladicek marked this conversation as resolved.
Show resolved Hide resolved
It is recommended to always define a `group.id`, the automatic generation is only a convenient feature for development.

Type: _string_ | false |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,40 @@
import org.eclipse.microprofile.config.ConfigProvider;

import io.quarkus.arc.DefaultBean;
import io.quarkus.runtime.ApplicationConfig;

@Dependent
public class KafkaRuntimeConfigProducer {

private String configPrefix = "kafka";
// not "kafka.", because we also inspect env vars, which start with "KAFKA_"
private static final String CONFIG_PREFIX = "kafka";

private static final String GROUP_ID = "group.id";

@Produces
@DefaultBean
@ApplicationScoped
@Named("default-kafka-broker")
public Map<String, Object> createKafkaRuntimeConfig() {
public Map<String, Object> createKafkaRuntimeConfig(ApplicationConfig app) {
Map<String, Object> properties = new HashMap<>();
final Config config = ConfigProvider.getConfig();

StreamSupport
.stream(config.getPropertyNames().spliterator(), false)
.map(String::toLowerCase)
.filter(name -> name.startsWith(configPrefix))
.filter(name -> name.startsWith(CONFIG_PREFIX))
.distinct()
.sorted()
.forEach(name -> {
final String key = name.substring(configPrefix.length() + 1).toLowerCase().replaceAll("[^a-z0-9.]", ".");
final String key = name.substring(CONFIG_PREFIX.length() + 1).toLowerCase().replaceAll("[^a-z0-9.]", ".");
final String value = config.getOptionalValue(name, String.class).orElse("");
properties.put(key, value);
});

if (!properties.containsKey(GROUP_ID) && app.name.isPresent()) {
properties.put(GROUP_ID, app.name.get());
}

return properties;
}

Expand Down