Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow kafka consumer group id prefix configuration #5072

Merged
merged 7 commits into from
Aug 18, 2023
Merged

feat: Allow kafka consumer group id prefix configuration #5072

merged 7 commits into from
Aug 18, 2023

Conversation

ukclivecox
Copy link
Contributor

What this PR does / why we need it:
Allows for customization of the Kafka consumer group id prefix.

Which issue(s) this PR fixes:
Fixes #5063

@ukclivecox ukclivecox added the v2 label Aug 8, 2023
@ukclivecox ukclivecox requested a review from agrski August 8, 2023 14:59
@ukclivecox ukclivecox changed the title Allow kafka consumer group id prefix configuration feat: Allow kafka consumer group id prefix configuration Aug 9, 2023
@review-notebook-app
Copy link

Check out this pull request on  ReviewNB

See visual diffs & provide feedback on Jupyter Notebooks.


Powered by ReviewNB

Copy link
Contributor

@agrski agrski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some recommended changes, although this looks to be functionally correct

scheduler/pkg/kafka/pipeline/consumer_manager_test.go Outdated Show resolved Hide resolved
scheduler/pkg/kafka/pipeline/consumer_manager_test.go Outdated Show resolved Hide resolved
scheduler/pkg/util/hash_test.go Show resolved Hide resolved
scheduler/pkg/util/hash_test.go Outdated Show resolved Hide resolved
samples/k8s-clusterwide.md Show resolved Hide resolved
@@ -31,7 +31,7 @@ import java.nio.file.Files
object K8sCertSecretsProvider {

private val kubeConfigPath: String = System.getenv("HOME") + "/.kube/config"
private val namespace = System.getenv("POD_NAMESPACE")
private val namespace = System.getenv("SELDON_POD_NAMESPACE")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💭 I'll put in a small follow-up PR to move these to use the CLI-provided value

@@ -25,6 +25,7 @@ import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.Topology
import java.util.concurrent.CountDownLatch
import javax.xml.stream.events.Namespace
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 Accidental import?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will remove

@@ -72,7 +74,7 @@ class PipelineSubscriber(
suspend fun subscribe() {
logger.info("will connect to ${upstreamHost}:${upstreamPort}")
retry(grpcFailurePolicy + binaryExponentialBackoff(50..5_000L)) {
subscribePipelines()
subscribePipelines(kafkaConsumerGroupIdPrefix, namespace)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙃 No need to pass these parameters explicitly as they've private vals above; happy to change in a follow-up PR for expediency

Copy link
Contributor

@agrski agrski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only minor comments outstanding; approving and will create a follow-up for a few tidy-ups as indicated in comments

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants