Skip to content

Commit

Permalink
fix: pull queries should work across nodes (confluentinc#4169)
Browse files Browse the repository at this point in the history
Backport of confluentinc#4169

Fixes: confluentinc#4142
Fixes: confluentinc#4151
Fixes: confluentinc#4152

Introduces a new `inter.node.listener` that can be used to specify a URL that the node can be contacted on by other nodes.  This can be different to the listeners defined in `listeners`. This can be required if `listeners` is set to a wildcard address, i.e. IPv4 `0.0.0.0` or IPv6 `[::]`, or if the node sits behind network infrastructure that requires other nodes to reach it using a different URL.

If `inter.node.listener` is not set it still defaults to the first listener in `listener` config. However, it now replaces an wildcard address with `localHost`. This means inter-node comms is still possible for nodes running on the same host.

Warnings are logged if the inter-node listener resolves to a loopback or local address.

(cherry picked from commit 0ac71cf)
  • Loading branch information
big-andy-coates committed Jan 10, 2020
1 parent 34a8795 commit 82e8c13
Show file tree
Hide file tree
Showing 9 changed files with 998 additions and 89 deletions.
6 changes: 6 additions & 0 deletions config/ksql-production-server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,23 @@
### HTTP ###
# The URL the KSQL server will listen on:
# The default is any IPv4 interface on the machine.
# NOTE: If set to wildcard or loopback set 'advertised.listener' to enable pull queries across machines
listeners=http://0.0.0.0:8088

# Use the 'listeners' line below for any IPv6 interface on the machine.
# listeners=http://[::]:8088

# If running a multi-node cluster across multiple machines and 'listeners' is set to a wildcard or loopback address
# 'advertised.listener' must be set to the URL other KSQL nodes should use to reach this node.
# advertised.listener=?

### HTTPS ###
# To switch KSQL over to communicating using HTTPS comment out the 'listeners' line above
# uncomment and complete the properties below.
# See: https://docs.confluent.io/current/ksql/docs/installation/server-config/security.html#configuring-ksql-cli-for-https
#
# listeners=https://0.0.0.0:8088
# advertised.listener=?
# ssl.keystore.location=?
# ssl.keystore.password=?
# ssl.key.password=?
Expand Down
6 changes: 6 additions & 0 deletions config/ksql-server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,23 @@
### HTTP ###
# The URL the KSQL server will listen on:
# The default is any IPv4 interface on the machine.
# NOTE: If set to wildcard or loopback set 'advertised.listener' to enable pull queries across machines
listeners=http://0.0.0.0:8088

# Use the 'listeners' line below for any IPv6 interface on the machine.
# listeners=http://[::]:8088

# If running a multi-node cluster across multiple machines and 'listeners' is set to a wildcard or loopback address
# 'advertised.listener' must be set to the URL other KSQL nodes should use to reach this node.
# advertised.listener=?

### HTTPS ###
# To switch KSQL over to communicating using HTTPS comment out the 'listeners' line above
# uncomment and complete the properties below.
# See: https://docs.confluent.io/current/ksql/docs/installation/server-config/security.html#configuring-ksql-cli-for-https
#
# listeners=https://0.0.0.0:8088
# advertised.listener=?
# ssl.keystore.location=?
# ssl.keystore.password=?
# ssl.key.password=?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static java.util.Objects.requireNonNull;

import java.net.URL;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
Expand Down Expand Up @@ -78,6 +79,19 @@ public static <T extends Enum<T>> Validator enumValues(final Class<T> enumClass)
return ValidCaseInsensitiveString.in(validValues);
}

public static Validator validUrl() {
return (name, val) -> {
if (!(val instanceof String)) {
throw new IllegalArgumentException("validator should only be used with STRING defs");
}
try {
new URL((String)val);
} catch (Exception e) {
throw new ConfigException(name, val, "Not valid URL: " + e.getMessage());
}
};
}

public static final class ValidCaseInsensitiveString implements Validator {

private final List<String> validStrings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,31 @@ public void shouldThrowIfParserThrows() {
validator.ensureValid("propName", "value");
}

@Test
public void shouldThrowOnInvalidURL() {
// Given:
final Validator validator = ConfigValidators.validUrl();

// Then:
expectedException.expect(ConfigException.class);
expectedException.expectMessage(
"Invalid value INVALID for configuration propName: Not valid URL: no protocol: INVALID");

// When:
validator.ensureValid("propName", "INVALID");
}

@Test
public void shouldNotThrowOnValidURL() {
// Given:
final Validator validator = ConfigValidators.validUrl();

// When:
validator.ensureValid("propName", "http://valid:25896/somePath");

// Then: did not throw.
}

private enum TestEnum {
FOO, BAR
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.rest.server;

import static io.confluent.ksql.rest.server.KsqlRestConfig.DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG;
import static io.confluent.rest.RestConfig.LISTENERS_CONFIG;
import static java.util.Objects.requireNonNull;

import com.fasterxml.jackson.jaxrs.base.JsonParseExceptionMapper;
Expand Down Expand Up @@ -73,6 +74,7 @@
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.util.RetryUtil;
import io.confluent.ksql.util.Version;
import io.confluent.ksql.util.WelcomeMsgUtils;
Expand All @@ -84,6 +86,7 @@
import java.io.Console;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
Expand All @@ -103,6 +106,7 @@
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.websocket.DeploymentException;
import javax.websocket.server.ServerEndpoint;
import javax.websocket.server.ServerEndpointConfig;
Expand Down Expand Up @@ -320,23 +324,46 @@ public void stop() {
}

List<URL> getListeners() {
return Arrays.stream(server.getConnectors())
.filter(connector -> connector instanceof ServerConnector)
.map(ServerConnector.class::cast)
.map(connector -> {
try {
final String protocol = new HashSet<>(connector.getProtocols())
.stream()
.map(String::toLowerCase)
.anyMatch(s -> s.equals("ssl")) ? "https" : "http";

final int localPort = connector.getLocalPort();

return new URL(protocol, "localhost", localPort, "");
} catch (final Exception e) {
throw new RuntimeException("Malformed listener", e);
}
})
final Function<URL, Set<Integer>> resolvePort = url ->
Arrays.stream(server.getConnectors())
.filter(connector -> connector instanceof ServerConnector)
.map(ServerConnector.class::cast)
.filter(connector -> {
final String connectorProtocol = connector.getProtocols().stream()
.map(String::toLowerCase)
.anyMatch(p -> p.equals("ssl")) ? "https" : "http";

return connectorProtocol.equalsIgnoreCase(url.getProtocol());
})
.map(ServerConnector::getLocalPort)
.collect(Collectors.toSet());

final Function<String, Stream<URL>> resolveUrl = listener -> {
try {
final URL url = new URL(listener);
if (url.getPort() != 0) {
return Stream.of(url);
}

// Need to resolve port using actual listeners:
return resolvePort.apply(url).stream()
.map(port -> {
try {
return new URL(url.getProtocol(), url.getHost(), port, url.getFile());
} catch (MalformedURLException e) {
throw new KsqlServerException("Malformed URL specified in '"
+ LISTENERS_CONFIG + "' config: " + listener, e);
}
});
} catch (MalformedURLException e) {
throw new KsqlServerException("Malformed URL specified in '"
+ LISTENERS_CONFIG + "' config: " + listener, e);
}
};

return config.getList(LISTENERS_CONFIG).stream()
.flatMap(resolveUrl)
.distinct()
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -654,21 +681,31 @@ private static void maybeCreateProcessingLogStream(
*
* @return true server config.
*/
private KsqlConfig buildConfigWithPort() {
@VisibleForTesting
KsqlConfig buildConfigWithPort() {
final Map<String, Object> props = ksqlConfigNoPort.originals();

// Wire up KS IQ endpoint discovery to the FIRST listener:
final URL firstListener = getListeners().get(0);
// Wire up KS IQ so that pull queries work across KSQL nodes:
props.put(
KsqlConfig.KSQL_STREAMS_PREFIX + StreamsConfig.APPLICATION_SERVER_CONFIG,
firstListener.toString()
config.getInterNodeListener(this::resolvePort).toString()
);

log.info("Using first listener URL for intra-node communication: {}", firstListener);

return new KsqlConfig(props);
}

private int resolvePort(final URL listener) {
return getListeners().stream()
.filter(l ->
l.getProtocol().equals(listener.getProtocol())
&& l.getHost().equals(listener.getHost())
)
.map(URL::getPort)
.findFirst()
.orElseThrow(() ->
new IllegalStateException("Failed resolve port for listener: " + listener));
}

private static KsqlRestConfig injectPathsWithoutAuthentication(final KsqlRestConfig restConfig) {
final Set<String> authenticationSkipPaths = new HashSet<>(
restConfig.getList(RestConfig.AUTHENTICATION_SKIP_PATHS)
Expand Down
Loading

0 comments on commit 82e8c13

Please sign in to comment.