Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Kafka config dynamically #4316

Merged
merged 3 commits into from
Jul 30, 2021
Merged

Conversation

bsideup
Copy link
Member

@bsideup bsideup commented Jul 30, 2021

This PR dramatically simplifies our Kafka integration by updating Kafka's config on the fly, so that we don't need to generate the entry point script at all.

It may make it more compatible with M1 - need to test it separately.

@kiview
Copy link
Member

kiview commented Jul 30, 2021

Although I did not expect anything to the contrary, I just wanted to add, that it also works with Docker for Windows on WSL backend.

Copy link
Member

@kiview kiview left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cluster example is failing.

@bsideup bsideup requested a review from kiview July 30, 2021 15:48
@bsideup
Copy link
Member Author

bsideup commented Jul 30, 2021

@kiview fixed 🎉

@@ -135,7 +135,7 @@
@NonNull
private String networkMode;

@NonNull
@Nullable
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the annotation was wrong (we do allow resetting the network to null (no network))

Comment on lines -138 to +126
// Kafka supports only one INTER_BROKER listener, so we have to pick one.
// The current algorithm uses the following order of resolving the IP:
// 1. Custom network's IP set via `withNetwork`
// 2. Bridge network's IP
// 3. Best effort fallback to getNetworkSettings#ipAddress
String ipAddress = containerInfo.getNetworkSettings().getNetworks().entrySet()
.stream()
.filter(it -> it.getValue().getIpAddress() != null)
.max(Comparator.comparingInt(entry -> {
if (getNetwork() != null && getNetwork().getId().equals(entry.getValue().getNetworkID())) {
return 2;
}

if ("bridge".equals(entry.getKey())) {
return 1;
}

return 0;
}))
.map(it -> it.getValue().getIpAddress())
.orElseGet(() -> containerInfo.getNetworkSettings().getIpAddress());

return String.format("BROKER://%s:%s", ipAddress, "9092");
return String.format("BROKER://%s:%s", containerInfo.getConfig().getHostName(), "9092");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was super fun to discover, but apparently we over-engineered this, and hostName is enough to make it work reliably - it works with both "no network" (a.k.a. "bridge only") and multi network scenarios, as containers are visible by their hostname to other containers in the same network 😂

Comment on lines -127 to +104
command += ". /etc/confluent/docker/bash-config \n";
command += "/etc/confluent/docker/configure \n";
command += "/etc/confluent/docker/launch \n";
// Optimization: skip the checks
command += "echo '' > /etc/confluent/docker/ensure \n";
// Run the original command
command += "/etc/confluent/docker/run \n";
withCommand("sh", "-c", command);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change is a tiny bit unrelated but, after looking for ways of simplifying things, I figured that we could delegate to the original CMD of Confluent's Kafka image but optimize it by removing the ensure step that adds ~2s, as it needs to instantiate a ZooKeeper client (not really necessary in our case, we have our own checks)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like this change.

Comment on lines +78 to +86
withEnv(
"KAFKA_ADVERTISED_LISTENERS",
String.format(
"BROKER://%s:9092",
getNetwork() != null
? getNetworkAliases().get(0)
: "localhost"
)
);
Copy link
Member Author

@bsideup bsideup Jul 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if set to just "localhost", clustered Kafka would fail, because There Can Be Only One Localhost ™

@bsideup
Copy link
Member Author

bsideup commented Jul 30, 2021

Pinging @jstastny-cz / @gunnarmorling / @gmunozfe who reported #3932.

While we haven't added the base impl that can be used (yet?), the new implementation is much easier and you may want to follow. Also, I tried it with Strimzi but then figured that, unlike with Confluent's Kafka, Strimzi does not seem to support KAFKA_ environment variables, and we heavily rely on them :(

Copy link
Member

@kiview kiview left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great further changes. Note that the failing jobs are possibly directly related to #4315, since they seem to be Selenium tests (so of course totally unrelated).

@bsideup
Copy link
Member Author

bsideup commented Jul 30, 2021

FYI M1 still fails with:

qemu: uncaught target signal 11 (Segmentation fault) - core dumped

:(

@bsideup bsideup merged commit 4df9b90 into master Jul 30, 2021
@bsideup bsideup deleted the update_kafka_config_dynamically branch July 30, 2021 21:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants