-
Notifications
You must be signed in to change notification settings - Fork 207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Updates Kafka configurations such that plugin has its own topic configurations #3551
Updates Kafka configurations such that plugin has its own topic configurations #3551
Conversation
…are not needed - schema and the serde_format for the topic configuration. As part of this change, I also split the TopicConfig into three distinct interfaces and classes. This allows each plugin to either accept a configuration or provide a value of the plugin's own choosing. Also adds copyright headers to all files modified as part of this commit. Signed-off-by: David Venable <[email protected]>
|
||
@JsonProperty("fetch_max_bytes") | ||
private String fetchMaxBytes = DEFAULT_FETCH_MAX_BYTES; | ||
|
||
@JsonProperty("fetch_max_wait") | ||
@Valid | ||
@Size(min = 1) | ||
private Integer fetchMaxWait = DEFAULT_FETCH_MAX_WAIT; | ||
|
||
@JsonProperty("fetch_min_bytes") | ||
private String fetchMinBytes = DEFAULT_FETCH_MIN_BYTES; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a lot of these are not applicable to Producer. Things like fetch_max_bytes
, auto_commit
etc are consumer specific.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I wasn't quite sure if I had caught them all or not.
… and buffer configurations. Also cleaned up the CommonTopicConfig somewhat. Adds a zeroBytes() static method to ByteCount as a convenience. Signed-off-by: David Venable <[email protected]>
@JsonProperty("retention_period") | ||
private Long retentionPeriod = DEFAULT_RETENTION_PERIOD; | ||
|
||
@JsonProperty("is_topic_create") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is some confusion on meaning of this property. if this is for whether kafka buffer should create topic then may be its should called createTopic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with this comment. I didn't change the name to avoid breaking anything, but this is only used in the sink and buffer, neither of which have been released. So I'll change this to create_topic
.
…ernally instead of Boolean since it will have a value. Signed-off-by: David Venable <[email protected]>
* An extension of the {@link TopicConfig} specifically for | ||
* producers to Kafka topics. | ||
*/ | ||
public interface ProducerTopicConfig extends TopicConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ProducerTopicConfig and ConsumerTopicConfig gives impression that there are 2 different topics one for producing and one for consuming data. may TopicProducerConfig or TopicConfigProducer may be better, but not a blocker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hshardeesi , That suggestion makes sense and I hadn't seen it from that perspective until you called it out. I went ahead and renamed the files.
…picConfig and ProducerTopicConfig. Signed-off-by: David Venable <[email protected]>
static final Duration DEFAULT_THREAD_WAITING_TIME = Duration.ofSeconds(5); | ||
static final Duration DEFAULT_RETRY_BACKOFF = Duration.ofSeconds(10); | ||
static final Duration DEFAULT_RECONNECT_BACKOFF = Duration.ofSeconds(10); | ||
static final Duration DEFAULT_MAX_POLL_INTERVAL = Duration.ofSeconds(300); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MAX_POLL_INTERVAL also doesn't belong in the common. common should be intersection of Producer and Consumer. We can get this info from https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html and https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the common list between the two. At the topic level it looks like only the following two are common?
reconnect.backoff.ms
retry.backoff.ms
Things like bootstrap servers, auth (ssl, sasl, oauth configs) are common between source and sink. But the at topic level very few are common.
… along with the corresponding subclasses. Signed-off-by: David Venable <[email protected]>
@@ -61,7 +67,7 @@ public void setDlqConfig(final PluginSetting pluginSetting) { | |||
|
|||
|
|||
@JsonProperty("topic") | |||
TopicConfig topic; | |||
SinkTopicConfig topic; | |||
|
|||
@JsonProperty("authentication") | |||
private AuthConfig authConfig; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should have pointed this out previously. It looks like "AuthConfig" and "AwsConfig" and "bootstrapServers", "SchemaConfig" are common between "KafkaSourceConfig" and "KafkaSinkConfig", does it make sense to move put these in a common interface and a common base class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same with getThreadWaitTime()/getThreadWaitingTime()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that is a good idea, but probably beyond the scope of this PR. The goal of this PR is to decouple the topic configurations as they do need to vary between the different plugins.
Description
Prior to this PR, all three Kafka plugins shared a common configuration for topic configurations -
TopicConfig
. This is somewhat problematic as each plugin does have different configurations for the topic depending on what the plugin is doing (producing, consuming, or both).Some of the configuration changes:
serde_format
from the Kafka buffer topic configuration. The Kafka buffer will always use bytes.schema
from the Kafka buffer configuration as it won't use a schema registry.number_of_partitions
,replication_factor
,retention_period
,is_topic_create
from the Kafka source's topic configuration.commit_interval
andkey_mode
from the Kafka sink's topic configuration.As part of this change, I split the TopicConfig into three distinct interfaces and classes. This allows each plugin to either accept a configuration or provide a value of the plugin's own choosing.
This PR also adds copyright headers to all files modified as part of this commit.
Issues Resolved
None, but contributes toward #3322.
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.